Skip to content

Commit 3da31b0

Browse files
committed
[SPARK-49090][CORE] Support JWSFilter
### What changes were proposed in this pull request? This PR aims to support `JWSFilter` which is a servlet filter that requires `JWS`, a cryptographically signed JSON Web Token, in the header via `spark.ui.filters` configuration. - spark.ui.filters=org.apache.spark.ui.JWSFilter - spark.org.apache.spark.ui.JWSFilter.param.key=YOUR-BASE64URL-ENCODED-KEY To simply put, `JWSFilter` will check the following for all requests. - The HTTP request should have `Authorization: Bearer <jws>` header. - `<jws>` is a string with three fields, `<header>.<payload>.<signature>`. - `<header>` is supposed to be a base64url-encoded string of `{"alg":"HS256","typ":"JWT"}`. - `<payload>` is a base64url-encoded string of fully-user-defined content. - `<signature>` is a signature based on `<header>.<payload>` and a user-provided key parameter. For example, the value of `<header>` will be `eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9` always and the value of `payload` can be `e30` if the payload is empty, `{}`. The `<signature>` part is changed by the shared value of `spark.org.apache.spark.ui.JWSFilter.param.key` between the server and client. ``` jshell> java.util.Base64.getUrlEncoder().encodeToString("{\"alg\":\"HS256\",\"typ\":\"JWT\"}".getBytes()) $2 ==> "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" jshell> java.util.Base64.getUrlEncoder().encodeToString("{}".getBytes()) $3 ==> "e30=" ``` ### Why are the changes needed? To provide a little better security on WebUI consistently including Spark Standalone Clusters. For example, **SETTING** ``` $ jshell | Welcome to JShell -- Version 17.0.12 | For an introduction type: /help intro jshell> java.util.Base64.getUrlEncoder().encodeToString("Visit https://spark.apache.org to download Apache Spark.".getBytes()) $1 ==> "VmlzaXQgaHR0cHM6Ly9zcGFyay5hcGFjaGUub3JnIHRvIGRvd25sb2FkIEFwYWNoZSBTcGFyay4=" ``` ``` $ cat conf/spark-defaults.conf spark.ui.filters org.apache.spark.ui.JWSFilter spark.org.apache.spark.ui.JWSFilter.param.key VmlzaXQgaHR0cHM6Ly9zcGFyay5hcGFjaGUub3JnIHRvIGRvd25sb2FkIEFwYWNoZSBTcGFyay4= ``` **SPARK-SHELL** ``` $ build/sbt package $ cp jjwt-impl-0.12.6.jar assembly/target/scala-2.13/jars $ cp jjwt-jackson-0.12.6.jar assembly/target/scala-2.13/jars $ bin/spark-shell ``` Without JWS (ErrorCode: 403 Forbidden) ``` $ curl -v http://localhost:4040/ * Host localhost:4040 was resolved. * IPv6: ::1 * IPv4: 127.0.0.1 * Trying [::1]:4040... * connect to ::1 port 4040 from ::1 port 61313 failed: Connection refused * Trying 127.0.0.1:4040... * Connected to localhost (127.0.0.1) port 4040 > GET / HTTP/1.1 > Host: localhost:4040 > User-Agent: curl/8.7.1 > Accept: */* > * Request completely sent off < HTTP/1.1 403 Forbidden < Date: Fri, 02 Aug 2024 01:27:23 GMT < Cache-Control: must-revalidate,no-cache,no-store < Content-Type: text/html;charset=iso-8859-1 < Content-Length: 472 < <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/> <title>Error 403 Authorization header is missing.</title> </head> <body><h2>HTTP ERROR 403 Authorization header is missing.</h2> <table> <tr><th>URI:</th><td>/</td></tr> <tr><th>STATUS:</th><td>403</td></tr> <tr><th>MESSAGE:</th><td>Authorization header is missing.</td></tr> <tr><th>SERVLET:</th><td>org.apache.spark.ui.JettyUtils$$anon$2-3b39bee2</td></tr> </table> </body> </html> * Connection #0 to host localhost left intact ``` With JWS, ``` $ curl -v -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4EKWlOkobpaAPR0J4BE0cPQ-ZD1tRQKLZp1vtE7upPw" http://localhost:4040/ * Host localhost:4040 was resolved. * IPv6: ::1 * IPv4: 127.0.0.1 * Trying [::1]:4040... * connect to ::1 port 4040 from ::1 port 61311 failed: Connection refused * Trying 127.0.0.1:4040... * Connected to localhost (127.0.0.1) port 4040 > GET / HTTP/1.1 > Host: localhost:4040 > User-Agent: curl/8.7.1 > Accept: */* > Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4EKWlOkobpaAPR0J4BE0cPQ-ZD1tRQKLZp1vtE7upPw > * Request completely sent off < HTTP/1.1 302 Found < Date: Fri, 02 Aug 2024 01:27:01 GMT < Cache-Control: no-cache, no-store, must-revalidate < X-Frame-Options: SAMEORIGIN < X-XSS-Protection: 1; mode=block < X-Content-Type-Options: nosniff < Location: http://localhost:4040/jobs/ < Content-Length: 0 < * Connection #0 to host localhost left intact ``` **SPARK MASTER** Without JWS (ErrorCode: 403 Forbidden) ``` $ curl -v http://localhost:8080/json/ * Host localhost:8080 was resolved. * IPv6: ::1 * IPv4: 127.0.0.1 * Trying [::1]:8080... * connect to ::1 port 8080 from ::1 port 61331 failed: Connection refused * Trying 127.0.0.1:8080... * Connected to localhost (127.0.0.1) port 8080 > GET /json/ HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/8.7.1 > Accept: */* > * Request completely sent off < HTTP/1.1 403 Forbidden < Date: Fri, 02 Aug 2024 01:34:03 GMT < Cache-Control: must-revalidate,no-cache,no-store < Content-Type: text/html;charset=iso-8859-1 < Content-Length: 477 < <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/> <title>Error 403 Authorization header is missing.</title> </head> <body><h2>HTTP ERROR 403 Authorization header is missing.</h2> <table> <tr><th>URI:</th><td>/json/</td></tr> <tr><th>STATUS:</th><td>403</td></tr> <tr><th>MESSAGE:</th><td>Authorization header is missing.</td></tr> <tr><th>SERVLET:</th><td>org.apache.spark.ui.JettyUtils$$anon$1-6c52101f</td></tr> </table> </body> </html> * Connection #0 to host localhost left intact ``` With JWS ``` $ curl -v -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4EKWlOkobpaAPR0J4BE0cPQ-ZD1tRQKLZp1vtE7upPw" http://localhost:8080/json/ * Host localhost:8080 was resolved. * IPv6: ::1 * IPv4: 127.0.0.1 * Trying [::1]:8080... * connect to ::1 port 8080 from ::1 port 61329 failed: Connection refused * Trying 127.0.0.1:8080... * Connected to localhost (127.0.0.1) port 8080 > GET /json/ HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/8.7.1 > Accept: */* > Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4EKWlOkobpaAPR0J4BE0cPQ-ZD1tRQKLZp1vtE7upPw > * Request completely sent off < HTTP/1.1 200 OK < Date: Fri, 02 Aug 2024 01:33:10 GMT < Cache-Control: no-cache, no-store, must-revalidate < X-Frame-Options: SAMEORIGIN < X-XSS-Protection: 1; mode=block < X-Content-Type-Options: nosniff < Content-Type: text/json;charset=utf-8 < Vary: Accept-Encoding < Content-Length: 320 < { "url" : "spark://M3-Max.local:7077", "workers" : [ ], "aliveworkers" : 0, "cores" : 0, "coresused" : 0, "memory" : 0, "memoryused" : 0, "resources" : [ ], "resourcesused" : [ ], "activeapps" : [ ], "completedapps" : [ ], "activedrivers" : [ ], "completeddrivers" : [ ], "status" : "ALIVE" * Connection #0 to host localhost left intact }% ``` ### Does this PR introduce _any_ user-facing change? No, this is a new filter. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47575 from dongjoon-hyun/SPARK-49090. Lead-authored-by: Dongjoon Hyun <dhyun@apple.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 080e7eb commit 3da31b0

File tree

5 files changed

+211
-0
lines changed

5 files changed

+211
-0
lines changed

LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ io.fabric8:kubernetes-model-scheduling
267267
io.fabric8:kubernetes-model-storageclass
268268
io.fabric8:zjsonpatch
269269
io.github.java-diff-utils:java-diff-utils
270+
io.jsonwebtoken:jjwt-api
270271
io.netty:netty-all
271272
io.netty:netty-buffer
272273
io.netty:netty-codec

core/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,23 @@
118118
<groupId>org.apache.zookeeper</groupId>
119119
<artifactId>zookeeper</artifactId>
120120
</dependency>
121+
<dependency>
122+
<groupId>io.jsonwebtoken</groupId>
123+
<artifactId>jjwt-api</artifactId>
124+
<version>0.12.6</version>
125+
</dependency>
126+
<dependency>
127+
<groupId>io.jsonwebtoken</groupId>
128+
<artifactId>jjwt-impl</artifactId>
129+
<version>0.12.6</version>
130+
<scope>test</scope>
131+
</dependency>
132+
<dependency>
133+
<groupId>io.jsonwebtoken</groupId>
134+
<artifactId>jjwt-jackson</artifactId>
135+
<version>0.12.6</version>
136+
<scope>test</scope>
137+
</dependency>
121138

122139
<!-- Jetty dependencies promoted to compile here so they are shaded
123140
and inlined into spark-core jar -->
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui
19+
20+
import javax.crypto.SecretKey
21+
22+
import io.jsonwebtoken.{JwtException, Jwts}
23+
import io.jsonwebtoken.io.Decoders
24+
import io.jsonwebtoken.security.Keys
25+
import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletRequest, ServletResponse}
26+
import jakarta.servlet.http.{HttpServletRequest, HttpServletResponse}
27+
28+
/**
29+
* A servlet filter that requires JWS, a cryptographically signed JSON Web Token, in the header.
30+
*
31+
* Like the other UI filters, the following configurations are required to use this filter.
32+
* {{{
33+
* - spark.ui.filters=org.apache.spark.ui.JWSFilter
34+
* - spark.org.apache.spark.ui.JWSFilter.param.key=BASE64URL-ENCODED-YOUR-PROVIDED-KEY
35+
* }}}
36+
* The HTTP request should have {@code Authorization: Bearer <jws>} header.
37+
* {{{
38+
* - <jws> is a string with three fields, '<header>.<payload>.<signature>'.
39+
* - <header> is supposed to be a base64url-encoded string of '{"alg":"HS256","typ":"JWT"}'.
40+
* - <payload> is a base64url-encoded string of fully-user-defined content.
41+
* - <signature> is a signature based on '<header>.<payload>' and a user-provided key parameter.
42+
* }}}
43+
*/
44+
private class JWSFilter extends Filter {
45+
private val AUTHORIZATION = "Authorization"
46+
47+
private var key: SecretKey = null
48+
49+
/**
50+
* Load and validate the configurtions:
51+
* - IllegalArgumentException will happen if the user didn't provide this argument
52+
* - WeakKeyException will happen if the user-provided value is insufficient
53+
*/
54+
override def init(config: FilterConfig): Unit = {
55+
key = Keys.hmacShaKeyFor(Decoders.BASE64URL.decode(config.getInitParameter("key")));
56+
}
57+
58+
override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
59+
val hreq = req.asInstanceOf[HttpServletRequest]
60+
val hres = res.asInstanceOf[HttpServletResponse]
61+
hres.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
62+
63+
try {
64+
val header = hreq.getHeader(AUTHORIZATION)
65+
header match {
66+
case null =>
67+
hres.sendError(HttpServletResponse.SC_FORBIDDEN, s"${AUTHORIZATION} header is missing.")
68+
case s"Bearer $token" =>
69+
val claims = Jwts.parser().verifyWith(key).build().parseSignedClaims(token)
70+
chain.doFilter(req, res)
71+
case _ =>
72+
hres.sendError(HttpServletResponse.SC_FORBIDDEN, s"Malformed ${AUTHORIZATION} header.")
73+
}
74+
} catch {
75+
case e: JwtException =>
76+
// We intentionally don't expose the detail of JwtException here
77+
hres.sendError(HttpServletResponse.SC_FORBIDDEN, "JWT Validate Fail")
78+
}
79+
}
80+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui
19+
20+
import java.util.{Base64, HashMap => JHashMap}
21+
22+
import scala.jdk.CollectionConverters._
23+
24+
import jakarta.servlet.{FilterChain, FilterConfig, ServletContext}
25+
import jakarta.servlet.http.{HttpServletRequest, HttpServletResponse}
26+
import org.mockito.ArgumentMatchers.{any, eq => meq}
27+
import org.mockito.Mockito.{mock, times, verify, when}
28+
29+
import org.apache.spark._
30+
31+
class JWSFilterSuite extends SparkFunSuite {
32+
// {"alg":"HS256","typ":"JWT"} => eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9, {} => e30
33+
private val TOKEN =
34+
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.4EKWlOkobpaAPR0J4BE0cPQ-ZD1tRQKLZp1vtE7upPw"
35+
36+
private val TEST_KEY = Base64.getUrlEncoder.encodeToString(
37+
"Visit https://spark.apache.org to download Apache Spark.".getBytes())
38+
39+
test("Should fail when a parameter is missing") {
40+
val filter = new JWSFilter()
41+
val params = new JHashMap[String, String]
42+
val m = intercept[IllegalArgumentException] {
43+
filter.init(new DummyFilterConfig(params))
44+
}.getMessage()
45+
assert(m.contains("Decode argument cannot be null"))
46+
}
47+
48+
test("Succeed to initialize") {
49+
val filter = new JWSFilter()
50+
val params = new JHashMap[String, String]
51+
params.put("key", TEST_KEY)
52+
filter.init(new DummyFilterConfig(params))
53+
}
54+
55+
test("Should response with SC_FORBIDDEN when it cannot verify JWS") {
56+
val req = mockRequest()
57+
val res = mock(classOf[HttpServletResponse])
58+
val chain = mock(classOf[FilterChain])
59+
60+
val filter = new JWSFilter()
61+
val params = new JHashMap[String, String]
62+
params.put("key", TEST_KEY)
63+
val conf = new DummyFilterConfig(params)
64+
filter.init(conf)
65+
66+
// 'Authorization' header is missing
67+
filter.doFilter(req, res, chain)
68+
verify(res).sendError(meq(HttpServletResponse.SC_FORBIDDEN),
69+
meq("Authorization header is missing."))
70+
verify(chain, times(0)).doFilter(any(), any())
71+
72+
// The value of Authorization field is not 'Bearer <token>' style.
73+
when(req.getHeader("Authorization")).thenReturn("Invalid")
74+
filter.doFilter(req, res, chain)
75+
verify(res).sendError(meq(HttpServletResponse.SC_FORBIDDEN),
76+
meq("Malformed Authorization header."))
77+
verify(chain, times(0)).doFilter(any(), any())
78+
}
79+
80+
test("Should succeed on valid JWS") {
81+
val req = mockRequest()
82+
val res = mock(classOf[HttpServletResponse])
83+
val chain = mock(classOf[FilterChain])
84+
85+
val filter = new JWSFilter()
86+
val params = new JHashMap[String, String]
87+
params.put("key", TEST_KEY)
88+
val conf = new DummyFilterConfig(params)
89+
filter.init(conf)
90+
91+
when(req.getHeader("Authorization")).thenReturn(s"Bearer $TOKEN")
92+
filter.doFilter(req, res, chain)
93+
verify(chain, times(1)).doFilter(any(), any())
94+
}
95+
96+
private def mockRequest(params: Map[String, Array[String]] = Map()): HttpServletRequest = {
97+
val req = mock(classOf[HttpServletRequest])
98+
when(req.getParameterMap()).thenReturn(params.asJava)
99+
req
100+
}
101+
102+
class DummyFilterConfig (val map: java.util.Map[String, String]) extends FilterConfig {
103+
override def getFilterName: String = "dummy"
104+
105+
override def getInitParameter(arg0: String): String = map.get(arg0)
106+
107+
override def getInitParameterNames: java.util.Enumeration[String] =
108+
java.util.Collections.enumeration(map.keySet)
109+
110+
override def getServletContext: ServletContext = null
111+
}
112+
}

dev/deps/spark-deps-hadoop-3-hive-2.3

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ jersey-server/3.0.12//jersey-server-3.0.12.jar
138138
jettison/1.5.4//jettison-1.5.4.jar
139139
jetty-util-ajax/11.0.21//jetty-util-ajax-11.0.21.jar
140140
jetty-util/11.0.21//jetty-util-11.0.21.jar
141+
jjwt-api/0.12.6//jjwt-api-0.12.6.jar
141142
jline/2.14.6//jline-2.14.6.jar
142143
jline/3.25.1//jline-3.25.1.jar
143144
jna/5.14.0//jna-5.14.0.jar

0 commit comments

Comments
 (0)