Skip to content

Commit

Permalink
[SPARK-35878][CORE] Add fs.s3a.endpoint if unset and fs.s3a.endpoint.…
Browse files Browse the repository at this point in the history
…region is null

### What changes were proposed in this pull request?

This patches the hadoop configuration so that fs.s3a.endpoint is set to
s3.amazonaws.com if neither it nor fs.s3a.endpoint.region is set.

This stops S3A Filesystem creation failing with the error
"Unable to find a region via the region provider chain."
in some non-EC2 deployments.

See: HADOOP-17771.

when spark options are propagated to the hadoop configuration
in SparkHadoopUtils. the fs.s3a.endpoint value is set to
"s3.amazonaws.com" if unset and no explicit region
is set in fs.s3a.endpoint.region.

### Why are the changes needed?

A regression in Hadoop 3.3.1 has surfaced which causes S3A filesystem
instantiation to fail outside EC2 deployments if the host lacks
a CLI configuration in ~/.aws/config declaring the region, or
the `AWS_REGION` environment variable

HADOOP-17771 fixes this in Hadoop-3.3.2+, but
this spark patch will correct the behavior when running
Spark with the 3.3.1 artifacts.

It is harmless for older versions and compatible
with hadoop releases containing the HADOOP-17771
fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New tests to verify propagation logic from spark conf to hadoop conf.

Closes #33064 from steveloughran/SPARK-35878-regions.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
steveloughran authored and dongjoon-hyun committed Jun 25, 2021
1 parent 9814cf8 commit 36aaaa1
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Expand Up @@ -421,6 +421,9 @@ private[spark] object SparkHadoopUtil extends Logging {
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a
* cluster manager-specific type.
* The configuration will load all default values set in core-default.xml,
* and if found on the classpath, those of core-site.xml.
* This is done before the spark overrides are applied.
*/
private[spark] def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
Expand Down Expand Up @@ -487,6 +490,18 @@ private[spark] object SparkHadoopUtil extends Logging {
if (conf.getOption("spark.hadoop.fs.s3a.downgrade.syncable.exceptions").isEmpty) {
hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true")
}
// In Hadoop 3.3.1, AWS region handling with the default "" endpoint only works
// in EC2 deployments or when the AWS CLI is installed.
// The workaround is to set the name of the S3 endpoint explicitly,
// if not already set. See HADOOP-17771.
// This change is harmless on older versions and compatible with
// later Hadoop releases
if (hadoopConf.get("fs.s3a.endpoint", "").isEmpty &&
hadoopConf.get("fs.s3a.endpoint.region") == null) {
// set to US central endpoint which can also connect to buckets
// in other regions at the expense of a HEAD request during fs creation
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
}
}

private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
Expand Down
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkFunSuite}

class SparkHadoopUtilSuite extends SparkFunSuite {

/**
* Verify that spark.hadoop options are propagated, and that
* the default s3a options are set as expected.
*/
test("appendSparkHadoopConfigs with propagation and defaults") {
val sc = new SparkConf()
val hadoopConf = new Configuration(false)
sc.set("spark.hadoop.orc.filterPushdown", "true")
new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
assertConfigValue(hadoopConf, "orc.filterPushdown", "true" )
assertConfigValue(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", "true")
assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com")
}

/**
* An empty S3A endpoint will be overridden just as a null value
* would.
*/
test("appendSparkHadoopConfigs with S3A endpoint set to empty string") {
val sc = new SparkConf()
val hadoopConf = new Configuration(false)
sc.set("spark.hadoop.fs.s3a.endpoint", "")
new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com")
}

/**
* Explicitly set the patched s3a options and verify that they are not overridden.
*/
test("appendSparkHadoopConfigs with S3A options explicitly set") {
val sc = new SparkConf()
val hadoopConf = new Configuration(false)
sc.set("spark.hadoop.fs.s3a.downgrade.syncable.exceptions", "false")
sc.set("spark.hadoop.fs.s3a.endpoint", "s3-eu-west-1.amazonaws.com")
new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
assertConfigValue(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", "false")
assertConfigValue(hadoopConf, "fs.s3a.endpoint",
"s3-eu-west-1.amazonaws.com")
}

/**
* If the endpoint region is set (even to a blank string) in
* "spark.hadoop.fs.s3a.endpoint.region" then the endpoint is not set,
* even when the s3a endpoint is "".
* This supports a feature in later hadoop versions where this configuration
* pair triggers a revert to the "SDK to work out the region" algorithm,
* which works on EC2 deployments.
*/
test("appendSparkHadoopConfigs with S3A endpoint region set to an empty string") {
val sc = new SparkConf()
val hadoopConf = new Configuration(false)
sc.set("spark.hadoop.fs.s3a.endpoint.region", "")
new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
// the endpoint value will not have been set
assertConfigValue(hadoopConf, "fs.s3a.endpoint", null)
}

/**
* Assert that a hadoop configuration option has the expected value.
* @param hadoopConf configuration to query
* @param key key to look up
* @param expected expected value.
*/
private def assertConfigValue(
hadoopConf: Configuration,
key: String,
expected: String): Unit = {
assert(hadoopConf.get(key) === expected,
s"Mismatch in expected value of $key")
}
}

0 comments on commit 36aaaa1

Please sign in to comment.