Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.rest.ErrorHandler;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types;

/**
* A simple, non-exhaustive set of helper methods for accessing the Iceberg REST API.
Expand All @@ -53,17 +56,36 @@ public CatalogApi(Client client, PolarisApiEndpoints endpoints, String authToken
}

public void createNamespace(String catalogName, String namespaceName) {
String[] namespaceLevels = namespaceName.split("\\u001F");
try (Response response =
request("v1/{cat}/namespaces", Map.of("cat", catalogName))
.post(
Entity.json(
CreateNamespaceRequest.builder()
.withNamespace(Namespace.of(namespaceName))
.withNamespace(Namespace.of(namespaceLevels))
.build()))) {
assertThat(response).returns(Response.Status.OK.getStatusCode(), Response::getStatus);
}
}

public void createTable(String catalogName, String namespaceName, String tableName) {
String[] namespaceLevels = namespaceName.split("\\u001F");
String encodedNamespace = RESTUtil.encodeNamespace(Namespace.of(namespaceLevels));
Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

CreateTableRequest request =
CreateTableRequest.builder().withName(tableName).withSchema(schema).build();

try (Response response =
request("v1/{cat}/namespaces/" + encodedNamespace + "/tables", Map.of("cat", catalogName))
.post(Entity.json(request))) {
assertThat(response).returns(Response.Status.OK.getStatusCode(), Response::getStatus);
}
}

public List<Namespace> listNamespaces(String catalog, Namespace parent) {
Map<String, String> queryParams = new HashMap<>();
if (!parent.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,97 @@ public void dropCatalog(String catalogName) {

deleteCatalog(catalogName);
}

// Storage Configuration Management Methods

/**
* Get the storage configuration for a namespace.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @return Response with StorageConfigInfo or error
*/
public Response getNamespaceStorageConfig(String catalogName, String namespace) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/storage-config",
Map.of("cat", catalogName, "ns", namespace))
.get();
}

/**
* Set the storage configuration for a namespace.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @param storageConfig the storage configuration to set
* @return Response with NamespaceStorageConfigResponse or error
*/
public Response setNamespaceStorageConfig(
String catalogName, String namespace, Object storageConfig) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/storage-config",
Map.of("cat", catalogName, "ns", namespace))
.put(Entity.json(storageConfig));
}

/**
* Delete the storage configuration override for a namespace.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @return Response (204 on success)
*/
public Response deleteNamespaceStorageConfig(String catalogName, String namespace) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/storage-config",
Map.of("cat", catalogName, "ns", namespace))
.delete();
}

/**
* Get the storage configuration for a table.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @param table the table name
* @return Response with StorageConfigInfo or error
*/
public Response getTableStorageConfig(String catalogName, String namespace, String table) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/tables/{tbl}/storage-config",
Map.of("cat", catalogName, "ns", namespace, "tbl", table))
.get();
}

/**
* Set the storage configuration for a table.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @param table the table name
* @param storageConfig the storage configuration to set
* @return Response with TableStorageConfigResponse or error
*/
public Response setTableStorageConfig(
String catalogName, String namespace, String table, Object storageConfig) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/tables/{tbl}/storage-config",
Map.of("cat", catalogName, "ns", namespace, "tbl", table))
.put(Entity.json(storageConfig));
}

/**
* Delete the storage configuration override for a table.
*
* @param catalogName the catalog name
* @param namespace the namespace (use unit separator 0x1F for multipart namespaces)
* @param table the table name
* @return Response (204 on success)
*/
public Response deleteTableStorageConfig(String catalogName, String namespace, String table) {
return request(
"v1/catalogs/{cat}/namespaces/{ns}/tables/{tbl}/storage-config",
Map.of("cat", catalogName, "ns", namespace, "tbl", table))
.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,7 @@ public void before(
catalogName = client.newEntityName("spark_catalog");
externalCatalogName = client.newEntityName("spark_ext_catalog");

AwsStorageConfigInfo awsConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::123456789012:role/my-role")
.setExternalId("externalId")
.setUserArn("userArn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
.build();
AwsStorageConfigInfo awsConfigModel = buildBaseCatalogStorageConfig();
CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data");
props.putAll(s3Container.getS3ConfigProperties());
props.put("polaris.config.drop-with-purge.enabled", "true");
Expand Down Expand Up @@ -135,6 +128,34 @@ protected SparkSession buildSparkSession() {
.getOrCreate();
}

protected AwsStorageConfigInfo buildBaseCatalogStorageConfig() {
AwsStorageConfigInfo.Builder builder =
AwsStorageConfigInfo.builder()
.setExternalId("externalId")
.setUserArn("userArn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"));

if (includeBaseCatalogRoleArn()) {
builder.setRoleArn("arn:aws:iam::123456789012:role/my-role");
}

Boolean stsUnavailable = baseCatalogStsUnavailable();
if (stsUnavailable != null) {
builder.setStsUnavailable(stsUnavailable);
}

return builder.build();
}

protected boolean includeBaseCatalogRoleArn() {
return true;
}

protected Boolean baseCatalogStsUnavailable() {
return null;
}

@AfterEach
public void after() throws Exception {
cleanupCatalog(catalogName);
Expand Down
Loading