Skip to content

Commit

Permalink
[BugFix] Fix iceberg catalog's aws region problem (#35088)
Browse files Browse the repository at this point in the history
Why I'm doing:
When the user didn't set region in iceberg catalog, we need to detect region, if detect failed, set default region us-east-1.

What I'm doing:
1. Make people using iceberg + s3 happy.
2. Support path style access parameters.
3. If user didn't configure any credentials, using SDK default behavior.

Signed-off-by: Smith Cruise <chendingchao1@126.com>
(cherry picked from commit 523a2cf)
  • Loading branch information
Smith-Cruise authored and wanpengfei-git committed Nov 16, 2023
1 parent 8d43a6d commit cfa8cfb
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@
import com.starrocks.credential.aws.AWSCloudConfigurationProvider;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
Expand All @@ -49,6 +55,7 @@
import static com.starrocks.credential.CloudConfigurationConstants.AWS_GLUE_USE_AWS_SDK_DEFAULT_BEHAVIOR;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_GLUE_USE_INSTANCE_PROFILE;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_ACCESS_KEY;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_ENABLE_PATH_STYLE_ACCESS;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_ENDPOINT;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_EXTERNAL_ID;
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_IAM_ROLE_ARN;
Expand All @@ -59,6 +66,7 @@
import static com.starrocks.credential.CloudConfigurationConstants.AWS_S3_USE_INSTANCE_PROFILE;

public class IcebergAwsClientFactory implements AwsClientFactory {
private static final Logger LOG = LogManager.getLogger(IcebergAwsClientFactory.class);
public static final String HTTPS_SCHEME = "https://";

private AwsProperties awsProperties;
Expand All @@ -72,7 +80,7 @@ public class IcebergAwsClientFactory implements AwsClientFactory {
private String s3ExternalId;
private String s3Region;
private String s3Endpoint;

private boolean s3EnablePathStyleAccess;
private boolean glueUseAWSSDKDefaultBehavior;
private boolean glueUseInstanceProfile;
private String glueAccessKey;
Expand All @@ -94,8 +102,10 @@ public void initialize(Map<String, String> properties) {
s3SessionToken = properties.getOrDefault(AWS_S3_SESSION_TOKEN, "");
s3IamRoleArn = properties.getOrDefault(AWS_S3_IAM_ROLE_ARN, "");
s3ExternalId = properties.getOrDefault(AWS_S3_EXTERNAL_ID, "");
s3Region = properties.getOrDefault(AWS_S3_REGION, AWSCloudConfigurationProvider.DEFAULT_AWS_REGION);
s3Region = properties.getOrDefault(AWS_S3_REGION, "");
s3Endpoint = properties.getOrDefault(AWS_S3_ENDPOINT, "");
s3EnablePathStyleAccess =
Boolean.parseBoolean(properties.getOrDefault(AWS_S3_ENABLE_PATH_STYLE_ACCESS, "false"));

glueUseAWSSDKDefaultBehavior = Boolean.parseBoolean(
properties.getOrDefault(AWS_GLUE_USE_AWS_SDK_DEFAULT_BEHAVIOR, "false"));
Expand All @@ -105,18 +115,16 @@ public void initialize(Map<String, String> properties) {
glueSessionToken = properties.getOrDefault(AWS_GLUE_SESSION_TOKEN, "");
glueIamRoleArn = properties.getOrDefault(AWS_GLUE_IAM_ROLE_ARN, "");
glueExternalId = properties.getOrDefault(AWS_GLUE_EXTERNAL_ID, "");
glueRegion = properties.getOrDefault(AWS_GLUE_REGION, AWSCloudConfigurationProvider.DEFAULT_AWS_REGION);
glueRegion = properties.getOrDefault(AWS_GLUE_REGION, "");
glueEndpoint = properties.getOrDefault(AWS_GLUE_ENDPOINT, "");
}

private StsAssumeRoleCredentialsProvider getAssumeRoleCredentialsProvider(AwsCredentialsProvider baseCredentials,
String iamRoleArn, String externalId,
String region) {
Region region) {
// Build sts client
StsClientBuilder stsClientBuilder = StsClient.builder().credentialsProvider(baseCredentials);
if (!region.isEmpty()) {
stsClientBuilder.region(Region.of(region));
}
stsClientBuilder.region(region);

// Build AssumeRoleRequest
AssumeRoleRequest.Builder assumeRoleBuilder = AssumeRoleRequest.builder();
Expand All @@ -141,23 +149,28 @@ public S3Client s3() {
getBaseAWSCredentialsProvider(s3UseAWSSDKDefaultBehavior, s3UseInstanceProfile, s3AccessKey,
s3SecretKey, s3SessionToken);
S3ClientBuilder s3ClientBuilder = S3Client.builder();

Region region = tryToResolveRegion(s3Region);

if (!s3IamRoleArn.isEmpty()) {
s3ClientBuilder.credentialsProvider(getAssumeRoleCredentialsProvider(baseAWSCredentialsProvider,
s3IamRoleArn, s3ExternalId, s3Region));
s3IamRoleArn, s3ExternalId, region));
} else {
s3ClientBuilder.credentialsProvider(baseAWSCredentialsProvider);
}

if (!s3Region.isEmpty()) {
s3ClientBuilder.region(Region.of(s3Region));
}
s3ClientBuilder.region(region);

// To prevent the 's3ClientBuilder' (NPE) exception, when 'aws.s3.endpoint' does not have
// 'scheme', we will add https scheme.
if (!s3Endpoint.isEmpty()) {
s3ClientBuilder.endpointOverride(ensureSchemeInEndpoint(s3Endpoint));
}

// set for s3 path style access
s3ClientBuilder.serviceConfiguration(
S3Configuration.builder().pathStyleAccessEnabled(s3EnablePathStyleAccess).build());

return s3ClientBuilder.build();
}

Expand All @@ -167,16 +180,17 @@ public GlueClient glue() {
getBaseAWSCredentialsProvider(glueUseAWSSDKDefaultBehavior, glueUseInstanceProfile, glueAccessKey,
glueSecretKey, glueSessionToken);
GlueClientBuilder glueClientBuilder = GlueClient.builder();

Region region = tryToResolveRegion(glueRegion);

if (!glueIamRoleArn.isEmpty()) {
glueClientBuilder.credentialsProvider(getAssumeRoleCredentialsProvider(baseAWSCredentialsProvider,
glueIamRoleArn, glueExternalId, glueRegion));
glueIamRoleArn, glueExternalId, region));
} else {
glueClientBuilder.credentialsProvider(baseAWSCredentialsProvider);
}

if (!glueRegion.isEmpty()) {
glueClientBuilder.region(Region.of(glueRegion));
}
glueClientBuilder.region(region);

// To prevent the 'glueClientBuilder' (NPE) exception, when 'aws.s3.endpoint' does not have
// 'scheme', we will add https scheme.
Expand All @@ -200,7 +214,9 @@ private AwsCredentialsProvider getBaseAWSCredentialsProvider(boolean useAWSSDKDe
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}
} else {
throw new IllegalArgumentException("Please configure the correct aws authentication parameters");
LOG.info("User didn't configure any credentials in Iceberg catalog, " +
"we will use AWS DefaultCredentialsProvider instead");
return DefaultCredentialsProvider.builder().build();
}
}

Expand All @@ -227,4 +243,22 @@ public static URI ensureSchemeInEndpoint(String endpoint) {
}
return URI.create(HTTPS_SCHEME + endpoint);
}

public static Region tryToResolveRegion(String strRegion) {
if (!strRegion.isEmpty()) {
return Region.of(strRegion);
}
Region region = Region.of(AWSCloudConfigurationProvider.DEFAULT_AWS_REGION);
try {
DefaultAwsRegionProviderChain providerChain = DefaultAwsRegionProviderChain.builder()
.profileFile(ProfileFile::defaultProfileFile)
.profileName(ProfileFileSystemSetting.AWS_PROFILE.getStringValueOrThrow()).build();
region = providerChain.getRegion();
} catch (Exception e) {
LOG.info(
"AWS sdk unable to load region from DefaultAwsRegionProviderChain, using default region us-east-1 instead",
e);
}
return region;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,47 @@

package com.starrocks.connector.iceberg;

import com.starrocks.credential.CloudConfigurationConstants;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.regions.Region;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class IcebergAwsClientFactoryTest {
@Before
public void setup() {
System.setProperty("software.amazon.awssdk.http.service.impl",
"software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
}

@Test
public void testInvalidCredential() {
IcebergAwsClientFactory factory = new IcebergAwsClientFactory();
public void testAKSK() {
Map<String, String> properties = new HashMap<>();
properties.put(CloudConfigurationConstants.AWS_S3_ACCESS_KEY, "ak");
properties.put(CloudConfigurationConstants.AWS_S3_SECRET_KEY, "sk");
properties.put(CloudConfigurationConstants.AWS_S3_ENDPOINT, "endpoint");
properties.put(CloudConfigurationConstants.AWS_S3_REGION, "xxx");

properties.put(CloudConfigurationConstants.AWS_GLUE_ACCESS_KEY, "ak");
properties.put(CloudConfigurationConstants.AWS_GLUE_SECRET_KEY, "sk");
properties.put(CloudConfigurationConstants.AWS_GLUE_ENDPOINT, "endpoint");
properties.put(CloudConfigurationConstants.AWS_GLUE_REGION, "region");
IcebergAwsClientFactory factory = new IcebergAwsClientFactory();
factory.initialize(properties);
Assert.assertThrows(IllegalArgumentException.class, factory::s3);
Assert.assertThrows(IllegalArgumentException.class, factory::glue);
Assert.assertNotNull(factory.s3());
Assert.assertNotNull(factory.glue());

Assert.assertNull(factory.dynamo());
Assert.assertNull(factory.kms());
}

@Test
public void testResolveRegion() {
Assert.assertEquals(Region.US_WEST_1, IcebergAwsClientFactory.tryToResolveRegion("us-west-1"));
}

@Test
Expand Down

0 comments on commit cfa8cfb

Please sign in to comment.