Skip to content

Commit 91f0650

Browse files
GrigorievNickvkorukanti
authored andcommitted
Add support of hadoop-aws s3a SimpleAWSCredentialsProvider to S3DynamoDBLogStore
(Cherry-pick of 86cf73f) - Add support for `SimpleAWSCredentialsProvider` or `TemporaryAWSCredentialsProvider` in `spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider` options. - Because delta rely on Spark and Hadoop FS storage layer, so it's obvious to have ability authorize in dynamo db client in same way as we authorize for s3. Resolves #1235. We use it in production with spark 3.2 on YARN 2.9.1 and my own fork of delta 1.2.1. Fork made from latest 1.2.1 with cherypicked multipart checkpoint commit. Scala 2.12 I have more than 100 tables, where data ingested every 10 minutes and multiple job work daily. Like retention and Row Level Update in some files. No. Except may be that [official example](https://docs.delta.io/latest/delta-storage.html#quickstart-s3-multi-cluster ) will work in any environment, and not only environment when Node where Spark App scheduled have configured AWS credentials. Please find more details about reason in #1235. Closes #1253 Signed-off-by: Venki Korukanti <venki.korukanti@gmail.com> GitOrigin-RevId: cbcc087457971c91d9908ac44398492bfa49d811
1 parent 86ae53b commit 91f0650

File tree

4 files changed

+175
-11
lines changed

4 files changed

+175
-11
lines changed

storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.delta.storage;
1818

19+
import io.delta.storage.utils.ReflectionUtils;
1920
import org.apache.hadoop.fs.Path;
2021

2122
import java.io.InterruptedIOException;
@@ -279,19 +280,12 @@ private void tryEnsureTableExists(Configuration hadoopConf) throws IOException {
279280

280281
private AmazonDynamoDBClient getClient() throws java.io.IOException {
281282
try {
282-
final AWSCredentialsProvider auth =
283-
(AWSCredentialsProvider) Class.forName(credentialsProviderName)
284-
.getConstructor()
285-
.newInstance();
286-
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(auth);
283+
final AWSCredentialsProvider awsCredentialsProvider =
284+
ReflectionUtils.createAwsCredentialsProvider(credentialsProviderName, initHadoopConf());
285+
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider);
287286
client.setRegion(Region.getRegion(Regions.fromName(regionName)));
288287
return client;
289-
} catch (
290-
ClassNotFoundException
291-
| InstantiationException
292-
| NoSuchMethodException
293-
| IllegalAccessException
294-
| java.lang.reflect.InvocationTargetException e) {
288+
} catch (ReflectiveOperationException e) {
295289
throw new java.io.IOException(e);
296290
}
297291
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.storage.utils;
18+
19+
import com.amazonaws.auth.AWSCredentialsProvider;
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.util.Arrays;
23+
24+
public class ReflectionUtils {
25+
26+
private static boolean readsCredsFromHadoopConf(Class<?> awsCredentialsProviderClass) {
27+
return Arrays.stream(awsCredentialsProviderClass.getConstructors())
28+
.anyMatch(constructor -> constructor.getParameterCount() == 1 &&
29+
Arrays.equals(constructor.getParameterTypes(), new Class[]{Configuration.class}));
30+
}
31+
32+
/**
33+
* Create AWS credentials provider from given provider classname and {@link Configuration}.
34+
*
35+
* It first check if AWS Credentials Provider class has constructor Hadoop configuration as parameter.
36+
* If yes - create instance of class using this constructor.
37+
* If no - create instance with empty parameters constructor.
38+
*
39+
* @param credentialsProviderClassName Fully qualified name of the desired credentials provider class.
40+
* @param hadoopConf Hadoop configuration, used to create instance of AWS credentials
41+
* provider, if supported.
42+
* @return {@link AWSCredentialsProvider} object, instantiated from the class @see {credentialsProviderClassName}
43+
* @throws ReflectiveOperationException When AWS credentials provider constrictor do not matched.
44+
* Means class has neither an constructor with no args as input
45+
* nor constructor with only Hadoop configuration as argument.
46+
*/
47+
public static AWSCredentialsProvider createAwsCredentialsProvider(
48+
String credentialsProviderClassName,
49+
Configuration hadoopConf) throws ReflectiveOperationException {
50+
Class<?> awsCredentialsProviderClass = Class.forName(credentialsProviderClassName);
51+
if (readsCredsFromHadoopConf(awsCredentialsProviderClass))
52+
return (AWSCredentialsProvider) awsCredentialsProviderClass
53+
.getConstructor(Configuration.class)
54+
.newInstance(hadoopConf);
55+
else
56+
return (AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
57+
}
58+
59+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.storage.utils;
18+
19+
import com.amazonaws.auth.AWSCredentials;
20+
import com.amazonaws.auth.AWSCredentialsProvider;
21+
import org.apache.hadoop.conf.Configuration;
22+
23+
public class ReflectionsUtilsSuiteHelper {
24+
// this class only purpose to test DynamoDBLogStore logic to create AWS credentials provider with reflection.
25+
public static class TestOnlyAWSCredentialsProviderWithHadoopConf implements AWSCredentialsProvider {
26+
27+
public TestOnlyAWSCredentialsProviderWithHadoopConf(Configuration hadoopConf) {}
28+
29+
@Override
30+
public AWSCredentials getCredentials() {
31+
return null;
32+
}
33+
34+
@Override
35+
public void refresh() {
36+
37+
}
38+
}
39+
40+
// this class only purpose to test DynamoDBLogStore logic to create AWS credentials provider with reflection.
41+
public static class TestOnlyAWSCredentialsProviderWithUnexpectedConstructor implements AWSCredentialsProvider {
42+
43+
public TestOnlyAWSCredentialsProviderWithUnexpectedConstructor(String hadoopConf) {}
44+
45+
@Override
46+
public AWSCredentials getCredentials() {
47+
return null;
48+
}
49+
50+
@Override
51+
public void refresh() {
52+
53+
}
54+
}
55+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.storage.utils
18+
19+
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider
20+
import io.delta.storage.utils.ReflectionsUtilsSuiteHelper.TestOnlyAWSCredentialsProviderWithHadoopConf
21+
import org.apache.hadoop.conf.Configuration
22+
import org.scalatest.funsuite.AnyFunSuite
23+
24+
class ReflectionsUtilsSuite extends AnyFunSuite {
25+
private val emptyHadoopConf = new Configuration()
26+
27+
test("support AWS credentials provider with hadoop Configuration as constructor parameter") {
28+
val awsProvider = ReflectionUtils.createAwsCredentialsProvider(
29+
"io.delta.storage.utils.ReflectionsUtilsSuiteHelper" +
30+
"$TestOnlyAWSCredentialsProviderWithHadoopConf",
31+
emptyHadoopConf
32+
)
33+
assert(
34+
awsProvider.isInstanceOf[TestOnlyAWSCredentialsProviderWithHadoopConf]
35+
)
36+
}
37+
38+
test("support AWS credentials provider with empty constructor(default from aws lib)") {
39+
val awsProvider = ReflectionUtils.createAwsCredentialsProvider(
40+
classOf[EnvironmentVariableCredentialsProvider].getCanonicalName,
41+
emptyHadoopConf
42+
)
43+
assert(awsProvider.isInstanceOf[EnvironmentVariableCredentialsProvider])
44+
}
45+
46+
test("do not support AWS credentials provider with unexpected constructors parameters") {
47+
assertThrows[NoSuchMethodException] {
48+
ReflectionUtils.createAwsCredentialsProvider(
49+
"io.delta.storage.utils.ReflectionsUtilsSuiteHelper" +
50+
"$TestOnlyAWSCredentialsProviderWithUnexpectedConstructor",
51+
emptyHadoopConf
52+
)
53+
}
54+
}
55+
56+
}

0 commit comments

Comments
 (0)