Skip to content

Commit

Permalink
[SPARK-48238][BUILD][YARN] Replace YARN AmIpFilter with a forked impl…
Browse files Browse the repository at this point in the history
…ementation

### What changes were proposed in this pull request?

This PR replaces AmIpFilter with a forked implementation, and removes the dependency `hadoop-yarn-server-web-proxy`

### Why are the changes needed?

SPARK-47118 upgraded Spark built-in Jetty from 10 to 11, and migrated from `javax.servlet` to `jakarta.servlet`, which breaks the Spark on YARN.

```
Caused by: java.lang.IllegalStateException: class org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter is not a jakarta.servlet.Filter
    at org.sparkproject.jetty.servlet.FilterHolder.doStart(FilterHolder.java:99)
    at org.sparkproject.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93)
    at org.sparkproject.jetty.servlet.ServletHandler.lambda$initialize$2(ServletHandler.java:724)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at org.sparkproject.jetty.servlet.ServletHandler.initialize(ServletHandler.java:749)
    ... 38 more
```

During the investigation, I found a comment here #31642 (comment)

> Agree that in the long term we should either: 1) consider to re-implement the logic in Spark which allows us to get away from server-side dependency in Hadoop ...

This should be a simple and clean way to address the exact issue, then we don't need to wait for Hadoop `jakarta.servlet` migration, and it also strips a Hadoop dependency.

### Does this PR introduce _any_ user-facing change?

No, this recovers the bootstrap of the Spark application on YARN mode, keeping the same behavior with Spark 3.5 and earlier versions.

### How was this patch tested?

UTs are added. (refer to `org.apache.hadoop.yarn.server.webproxy.amfilter.TestAmFilter`)

I tested it in a YARN cluster.

Spark successfully started.
```
roothadoop-master1:/opt/spark-SPARK-48238# JAVA_HOME=/opt/openjdk-17 bin/spark-sql --conf spark.yarn.appMasterEnv.JAVA_HOME=/opt/openjdk-17 --conf spark.executorEnv.JAVA_HOME=/opt/openjdk-17
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-05-18 04:11:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-05-18 04:11:44 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive} is set, falling back to uploading libraries under SPARK_HOME.
Spark Web UI available at http://hadoop-master1.orb.local:4040
Spark master: yarn, Application Id: application_1716005503866_0001
spark-sql (default)> select version();
4.0.0 4ddc230
Time taken: 1.707 seconds, Fetched 1 row(s)
spark-sql (default)>
```

When access `http://hadoop-master1.orb.local:4040`, it redirects to `http://hadoop-master1.orb.local:8088/proxy/redirect/application_1716005503866_0001/`, and the UI looks correct.

<img width="1474" alt="image" src="https://github.com/apache/spark/assets/26535726/8500fc83-48c5-4603-8d05-37855f0308ae">

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46611 from pan3793/SPARK-48238.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
pan3793 authored and LuciferYang committed May 20, 2024
1 parent 6fcdaab commit 4fc2910
Show file tree
Hide file tree
Showing 10 changed files with 798 additions and 84 deletions.
4 changes: 0 additions & 4 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
1 change: 0 additions & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ hadoop-client-runtime/3.4.0//hadoop-client-runtime-3.4.0.jar
hadoop-cloud-storage/3.4.0//hadoop-cloud-storage-3.4.0.jar
hadoop-huaweicloud/3.4.0//hadoop-huaweicloud-3.4.0.jar
hadoop-shaded-guava/1.2.0//hadoop-shaded-guava-1.2.0.jar
hadoop-yarn-server-web-proxy/3.4.0//hadoop-yarn-server-web-proxy-3.4.0.jar
hive-beeline/2.3.10//hive-beeline-2.3.10.jar
hive-cli/2.3.10//hive-cli-2.3.10.jar
hive-common/2.3.10//hive-common-2.3.10.jar
Expand Down
77 changes: 0 additions & 77 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1769,83 +1769,6 @@
<version>${yarn.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>${yarn.version}</version>
<scope>${hadoop.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</exclusion>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- Hadoop-3.x -->
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
<exclusion>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

import jakarta.servlet.*;
import jakarta.servlet.http.Cookie;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;

// This class is copied from Hadoop 3.4.0
// org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
//
// Modification:
// Migrate from javax.servlet to jakarta.servlet
// Copy constant string definitions to strip external dependency
// - RM_HA_URLS
// - PROXY_USER_COOKIE_NAME
@Public
public class AmIpFilter implements Filter {
private static final SparkLogger LOG = SparkLoggerFactory.getLogger(AmIpFilter.class);

@Deprecated
public static final String PROXY_HOST = "PROXY_HOST";
@Deprecated
public static final String PROXY_URI_BASE = "PROXY_URI_BASE";
public static final String PROXY_HOSTS = "PROXY_HOSTS";
public static final String PROXY_HOSTS_DELIMITER = ",";
public static final String PROXY_URI_BASES = "PROXY_URI_BASES";
public static final String PROXY_URI_BASES_DELIMITER = ",";
private static final String PROXY_PATH = "/proxy";
// RM_HA_URLS is defined in AmFilterInitializer in the original Hadoop code
private static final String RM_HA_URLS = "RM_HA_URLS";
// WebAppProxyServlet is defined in WebAppProxyServlet in the original Hadoop code
public static final String PROXY_USER_COOKIE_NAME = "proxy-user";
// update the proxy IP list about every 5 min
private static long updateInterval = TimeUnit.MINUTES.toMillis(5);

private String[] proxyHosts;
private Set<String> proxyAddresses = null;
private long lastUpdate;
@VisibleForTesting
Map<String, String> proxyUriBases;
String[] rmUrls = null;

@Override
public void init(FilterConfig conf) throws ServletException {
// Maintain for backwards compatibility
if (conf.getInitParameter(PROXY_HOST) != null
&& conf.getInitParameter(PROXY_URI_BASE) != null) {
proxyHosts = new String[]{conf.getInitParameter(PROXY_HOST)};
proxyUriBases = new HashMap<>(1);
proxyUriBases.put("dummy", conf.getInitParameter(PROXY_URI_BASE));
} else {
proxyHosts = conf.getInitParameter(PROXY_HOSTS)
.split(PROXY_HOSTS_DELIMITER);

String[] proxyUriBasesArr = conf.getInitParameter(PROXY_URI_BASES)
.split(PROXY_URI_BASES_DELIMITER);
proxyUriBases = new HashMap<>(proxyUriBasesArr.length);
for (String proxyUriBase : proxyUriBasesArr) {
try {
URL url = new URL(proxyUriBase);
proxyUriBases.put(url.getHost() + ":" + url.getPort(), proxyUriBase);
} catch(MalformedURLException e) {
LOG.warn(proxyUriBase + " does not appear to be a valid URL", e);
}
}
}

if (conf.getInitParameter(RM_HA_URLS) != null) {
rmUrls = conf.getInitParameter(RM_HA_URLS).split(",");
}
}

protected Set<String> getProxyAddresses() throws ServletException {
long now = Time.monotonicNow();
synchronized(this) {
if (proxyAddresses == null || (lastUpdate + updateInterval) <= now) {
proxyAddresses = new HashSet<>();
for (String proxyHost : proxyHosts) {
try {
for (InetAddress add : InetAddress.getAllByName(proxyHost)) {
LOG.debug("proxy address is: {}", add.getHostAddress());
proxyAddresses.add(add.getHostAddress());
}
lastUpdate = now;
} catch (UnknownHostException e) {
LOG.warn("Could not locate " + proxyHost + " - skipping", e);
}
}
if (proxyAddresses.isEmpty()) {
throw new ServletException("Could not locate any of the proxy hosts");
}
}
return proxyAddresses;
}
}

@Override
public void destroy() {
// Empty
}

@Override
public void doFilter(ServletRequest req, ServletResponse resp,
FilterChain chain) throws IOException, ServletException {
ProxyUtils.rejectNonHttpRequests(req);

HttpServletRequest httpReq = (HttpServletRequest)req;
HttpServletResponse httpResp = (HttpServletResponse)resp;

LOG.debug("Remote address for request is: {}", httpReq.getRemoteAddr());

if (!getProxyAddresses().contains(httpReq.getRemoteAddr())) {
StringBuilder redirect = new StringBuilder(findRedirectUrl());

redirect.append(httpReq.getRequestURI());

int insertPoint = redirect.indexOf(PROXY_PATH);

if (insertPoint >= 0) {
// Add /redirect as the second component of the path so that the RM web
// proxy knows that this request was a redirect.
insertPoint += PROXY_PATH.length();
redirect.insert(insertPoint, "/redirect");
}
// add the query parameters on the redirect if there were any
String queryString = httpReq.getQueryString();
if (queryString != null && !queryString.isEmpty()) {
redirect.append("?");
redirect.append(queryString);
}

ProxyUtils.sendRedirect(httpReq, httpResp, redirect.toString());
} else {
String user = null;

if (httpReq.getCookies() != null) {
for (Cookie c: httpReq.getCookies()) {
if (PROXY_USER_COOKIE_NAME.equals(c.getName())){
user = c.getValue();
break;
}
}
}
if (user == null) {
LOG.debug("Could not find {} cookie, so user will not be set",
PROXY_USER_COOKIE_NAME);

chain.doFilter(req, resp);
} else {
AmIpPrincipal principal = new AmIpPrincipal(user);
ServletRequest requestWrapper = new AmIpServletRequestWrapper(httpReq,
principal);

chain.doFilter(requestWrapper, resp);
}
}
}

@VisibleForTesting
public String findRedirectUrl() throws ServletException {
String addr = null;
if (proxyUriBases.size() == 1) {
// external proxy or not RM HA
addr = proxyUriBases.values().iterator().next();
} else if (rmUrls != null) {
for (String url : rmUrls) {
String host = proxyUriBases.get(url);
if (isValidUrl(host)) {
addr = host;
break;
}
}
}

if (addr == null) {
throw new ServletException(
"Could not determine the proxy server for redirection");
}
return addr;
}

@VisibleForTesting
public boolean isValidUrl(String url) {
boolean isValid = false;
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.connect();
isValid = conn.getResponseCode() == HttpURLConnection.HTTP_OK;
// If security is enabled, any valid RM which can give 401 Unauthorized is
// good enough to access. Since AM doesn't have enough credential, auth
// cannot be completed and hence 401 is fine in such case.
if (!isValid && UserGroupInformation.isSecurityEnabled()) {
isValid = (conn.getResponseCode() == HttpURLConnection.HTTP_UNAUTHORIZED)
|| (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN);
return isValid;
}
} catch (Exception e) {
LOG.warn("Failed to connect to " + url + ": " + e.toString());
}
return isValid;
}

@VisibleForTesting
protected static void setUpdateInterval(long updateInterval) {
AmIpFilter.updateInterval = updateInterval;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn;

import java.security.Principal;

// This class is copied from Hadoop 3.4.0
// org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpPrincipal
public class AmIpPrincipal implements Principal {
private final String name;

public AmIpPrincipal(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}
}
Loading

0 comments on commit 4fc2910

Please sign in to comment.