Skip to content

Commit

Permalink
[AMORO-2960] Support paimon s3 based catalog (#2972)
Browse files Browse the repository at this point in the history
* [AMORO-2960] Support paimon s3 based catalog

* trigger ci
  • Loading branch information
xleoken committed Jun 28, 2024
1 parent c53ec2b commit 97f299c
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 7 deletions.
5 changes: 4 additions & 1 deletion amoro-ams/amoro-ams-dashboard/src/views/catalogs/Detail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,12 @@ const storageConfigTypeOps = computed(() => {
else if (type === 'glue') {
return storageConfigTypeS3
}
else if (type === 'hive' || type === 'hadoop') {
else if (type === 'hive') {
return storageConfigTypeHadoop
}
else if (type === 'hadoop') {
return storageConfigTypeHadoopS3
}
else {
return null
}
Expand Down
5 changes: 5 additions & 0 deletions amoro-ams/amoro-ams-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-optimizer-standalone</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, PAIMON));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_S3, PAIMON));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
Expand Down Expand Up @@ -192,7 +194,7 @@ public void getCatalogTypeList(Context ctx) {
String displayKey = "display";
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_AMS, displayKey, "Amoro Metastore"));
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_HIVE, displayKey, "Hive Metastore"));
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_HADOOP, displayKey, "Hadoop"));
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_HADOOP, displayKey, "Filesystem"));
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_GLUE, displayKey, "Glue"));
catalogTypes.add(ImmutableMap.of(valueKey, CATALOG_TYPE_CUSTOM, displayKey, "Custom"));
ctx.json(OkResponse.of(catalogTypes));
Expand Down Expand Up @@ -238,6 +240,11 @@ private void fillAuthConfigs2CatalogMeta(
AUTH_CONFIGS_KEY_PRINCIPAL, serverAuthConfig.get(AUTH_CONFIGS_KEY_PRINCIPAL));
break;
case AUTH_CONFIGS_VALUE_TYPE_AK_SK:
metaAuthConfig.put(
AUTH_CONFIGS_KEY_ACCESS_KEY, serverAuthConfig.get(AUTH_CONFIGS_KEY_ACCESS_KEY));
metaAuthConfig.put(
AUTH_CONFIGS_KEY_SECRET_KEY, serverAuthConfig.get(AUTH_CONFIGS_KEY_SECRET_KEY));

MixedCatalogUtil.copyProperty(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import java.util.Optional;

public class PaimonCatalogFactory implements FormatCatalogFactory {

public static final String PAIMON_S3_ACCESS_KEY = "s3.access-key";
public static final String PAIMON_S3_SECRET_KEY = "s3.secret-key";

@Override
public PaimonCatalog create(
String name, String metastoreType, Map<String, String> properties, TableMetaStore metaStore) {
Expand All @@ -49,8 +53,18 @@ public PaimonCatalog create(
url ->
catalogProperties.put(
HiveCatalogOptions.HIVE_CONF_DIR.key(), new File(url.getPath()).getParent()));
Catalog catalog = paimonCatalog(catalogProperties, metaStore.getConfiguration());
return new PaimonCatalog(catalog, name);

if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(
metaStore.getAuthMethod())) {
// s3.access-key, s3.secret-key
catalogProperties.put(PAIMON_S3_ACCESS_KEY, metaStore.getAccessKey());
catalogProperties.put(PAIMON_S3_SECRET_KEY, metaStore.getSecretKey());
Catalog catalog = paimonCatalog(catalogProperties, new Configuration());
return new PaimonCatalog(catalog, name);
} else {
Catalog catalog = paimonCatalog(catalogProperties, metaStore.getConfiguration());
return new PaimonCatalog(catalog, name);
}
}

public static Catalog paimonCatalog(Map<String, String> properties, Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.table;

import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Charsets;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class TableMetaStore implements Serializable {
public static final String AUTH_METHOD = "auth.method";
public static final String KEYTAB_LOGIN_USER = "krb.principal";
public static final String SIMPLE_USER_NAME = "simple.user.name";
public static final String AUTH_METHOD_AK_SK = "ak/sk";
public static final String AUTH_METHOD_SIMPLE = "SIMPLE";
public static final String AUTH_METHOD_KERBEROS = "KERBEROS";

Expand Down Expand Up @@ -126,6 +128,10 @@ public class TableMetaStore implements Serializable {
private transient RuntimeContext runtimeContext;
private transient String authInformation;

private String accessKey;

private String secretKey;

public static Builder builder() {
return new Builder();
}
Expand All @@ -139,11 +145,14 @@ private TableMetaStore(
byte[] krbKeyTab,
byte[] krbConf,
String krbPrincipal,
String accessKey,
String secretKey,
boolean disableAuth) {
Preconditions.checkArgument(
authMethod == null
|| AUTH_METHOD_SIMPLE.equals(authMethod)
|| AUTH_METHOD_KERBEROS.equals(authMethod),
|| AUTH_METHOD_KERBEROS.equals(authMethod)
|| AUTH_METHOD_AK_SK.equals(authMethod),
"Error auth method:%s",
authMethod);
this.metaStoreSite = metaStoreSite;
Expand All @@ -155,6 +164,8 @@ private TableMetaStore(
this.krbConf = krbConf;
this.krbPrincipal = krbPrincipal;
this.disableAuth = disableAuth;
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public byte[] getMetaStoreSite() {
Expand Down Expand Up @@ -185,6 +196,14 @@ public String getAuthMethod() {
return authMethod;
}

public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public boolean isKerberosAuthMethod() {
return AUTH_METHOD_KERBEROS.equalsIgnoreCase(authMethod);
}
Expand All @@ -203,7 +222,8 @@ public synchronized UserGroupInformation getUGI() {

public <T> T doAs(Callable<T> callable) {
// if disableAuth, use process ugi to execute
if (disableAuth) {
if (disableAuth
|| CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authMethod)) {
return call(callable);
}
return Objects.requireNonNull(getUGI()).doAs((PrivilegedAction<T>) () -> call(callable));
Expand Down Expand Up @@ -541,6 +561,9 @@ public static class Builder {
private byte[] krbKeyTab;
private byte[] krbConf;
private String krbPrincipal;

private String accessKey;
private String secretKey;
private boolean disableAuth = true;
private final Map<String, String> properties = Maps.newHashMap();
private Configuration configuration;
Expand Down Expand Up @@ -595,6 +618,14 @@ public Builder withBase64CoreSite(String encodedCoreSite) {
return this;
}

public Builder withAkSkAuth(String accessKey, String secretKey) {
this.disableAuth = false;
this.authMethod = AUTH_METHOD_AK_SK;
this.accessKey = accessKey;
this.secretKey = secretKey;
return this;
}

public Builder withSimpleAuth(String hadoopUsername) {
this.disableAuth = false;
this.authMethod = AUTH_METHOD_SIMPLE;
Expand Down Expand Up @@ -715,7 +746,7 @@ private void readProperties() {

public TableMetaStore build() {
readProperties();
if (!disableAuth) {
if (!disableAuth & !AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(hdfsSite);
Preconditions.checkNotNull(coreSite);
}
Expand All @@ -725,6 +756,9 @@ public TableMetaStore build() {
Preconditions.checkNotNull(krbConf);
Preconditions.checkNotNull(krbKeyTab);
Preconditions.checkNotNull(krbPrincipal);
} else if (AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(accessKey);
Preconditions.checkNotNull(secretKey);
} else if (authMethod != null) {
throw new IllegalArgumentException("Unsupported auth method:" + authMethod);
}
Expand All @@ -743,6 +777,8 @@ public TableMetaStore build() {
krbKeyTab,
krbConf,
krbPrincipal,
accessKey,
secretKey,
disableAuth);
}

Expand All @@ -759,6 +795,8 @@ public TableMetaStore buildForTest() {
krbKeyTab,
krbConf,
krbPrincipal,
accessKey,
secretKey,
disableAuth);
tableMetaStore.getRuntimeContext().setConfiguration(configuration);
return tableMetaStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
String keytab = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB);
String principal = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL);
builder.withBase64KrbAuth(keytab, krb5, principal);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) {
String accessKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY);
String secretKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY);
builder.withAkSkAuth(accessKey, secretKey);
}
}
}
Expand All @@ -200,6 +204,16 @@ public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL);
builder.withBase64KrbAuth(keytab, krb5, principal);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) {
String accessKey =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY);
String secretKey =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY);
builder.withAkSkAuth(accessKey, secretKey);
}
}
return builder.build();
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,12 @@
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-s3</artifactId>
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down

0 comments on commit 97f299c

Please sign in to comment.