Skip to content

Add support for customer write.data.path and write.metadata.path with test for object store location provider #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,24 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.polaris.core.PolarisConfiguration;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
Expand Down Expand Up @@ -174,11 +180,31 @@ public static Optional<PolarisStorageConfigurationInfo> forEntityPath(
LOGGER.debug(
"Allowing unstructured table location for entity: {}",
entityPathReversed.get(0).getName());
return configInfo;

List<String> locs =
userSpecifiedWriteLocations(entityPathReversed.get(0).getPropertiesAsMap());
return new StorageConfigurationOverride(
configInfo,
ImmutableList.<String>builder()
.addAll(configInfo.getAllowedLocations())
.addAll(locs)
.build());
}
});
}

private static List<String> userSpecifiedWriteLocations(Map<String, String> properties) {
return Optional.ofNullable(properties)
.map(
p ->
Stream.of(
p.get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY),
p.get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY))
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.orElse(List.of());
}

private static @NotNull Optional<PolarisEntity> findStorageInfoFromHierarchy(
List<PolarisEntity> entityPath) {
for (int i = entityPath.size() - 1; i >= 0; i--) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@ private Set<String> getLocationsAllowedToBeAccessed(TableMetadata tableMetadata)
Set<String> locations = new HashSet<>();
locations.add(concatFilePrefixes(basicLocation, "data/", "/"));
locations.add(concatFilePrefixes(basicLocation, "metadata/", "/"));
if (tableMetadata
.properties()
.containsKey(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) {
locations.add(
tableMetadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
}
if (tableMetadata
.properties()
.containsKey(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) {
locations.add(
tableMetadata
.properties()
.get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
}
return locations;
}

Expand Down Expand Up @@ -854,10 +868,6 @@ private Map<String, String> refreshCredentials(
Set<PolarisStorageActions> storageActions,
Set<String> tableLocations,
PolarisEntity entity) {
// Important: Any locations added to the set of requested locations need to be validated
// prior to requested subscoped credentials.
tableLocations.forEach(tl -> validateLocationForTableLike(tableIdentifier, tl));

Boolean skipCredentialSubscopingIndirection =
getBooleanContextConfiguration(
SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION, SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION_DEFAULT);
Expand Down Expand Up @@ -924,6 +934,17 @@ private void validateLocationForTableLike(
TableIdentifier identifier,
String location,
PolarisResolvedPathWrapper resolvedStorageEntity) {
validateLocationsForTableLike(identifier, Set.of(location), resolvedStorageEntity);
}

/**
* Validates that the specified {@code locations} are valid for whatever storage config is found
* for this TableLike's parent hierarchy.
*/
private void validateLocationsForTableLike(
TableIdentifier identifier,
Set<String> locations,
PolarisResolvedPathWrapper resolvedStorageEntity) {
Optional<PolarisStorageConfigurationInfo> optStorageConfiguration =
PolarisStorageConfigurationInfo.forEntityPath(
callContext.getPolarisCallContext().getDiagServices(),
Expand All @@ -934,7 +955,7 @@ private void validateLocationForTableLike(
Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>>
validationResults =
InMemoryStorageIntegration.validateSubpathsOfAllowedLocations(
storageConfigInfo, Set.of(PolarisStorageActions.ALL), Set.of(location));
storageConfigInfo, Set.of(PolarisStorageActions.ALL), locations);
validationResults
.values()
.forEach(
Expand All @@ -945,12 +966,12 @@ private void validateLocationForTableLike(
result -> {
if (!result.isSuccess()) {
throw new ForbiddenException(
"Invalid location '%s' for identifier '%s': %s",
location, identifier, result.getMessage());
"Invalid locations '%s' for identifier '%s': %s",
locations, identifier, result.getMessage());
} else {
LOGGER.debug(
"Validated location '{}' for identifier '{}'",
location,
"Validated locations '{}' for identifier '{}'",
locations,
identifier);
}
}));
Expand All @@ -975,10 +996,14 @@ private void validateLocationForTableLike(
// }
},
() -> {
if (location.startsWith("file:") || location.startsWith("http")) {
List<String> invalidLocations =
locations.stream()
.filter(location -> location.startsWith("file:") || location.startsWith("http"))
.collect(Collectors.toList());
if (!invalidLocations.isEmpty()) {
throw new ForbiddenException(
"Invalid location '%s' for identifier '%s': File locations are not allowed",
location, identifier);
"Invalid locations '%s' for identifier '%s': File locations are not allowed",
invalidLocations, identifier);
}
});
}
Expand All @@ -998,8 +1023,7 @@ private void validateNoLocationOverlap(
LOGGER.debug("Skipping location overlap validation for identifier '{}'", identifier);
} else { // if (entity.getSubType().equals(PolarisEntitySubType.TABLE)) {
// TODO - is this necessary for views? overlapping views do not expose subdirectories via the
// credential vending
// so this feels like an unnecessary restriction
// credential vending so this feels like an unnecessary restriction
LOGGER.debug("Validating no overlap with sibling tables or namespaces");
validateNoLocationOverlap(location, resolvedNamespace, identifier.name());
}
Expand Down Expand Up @@ -1249,12 +1273,31 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
: resolvedTableEntities.getRawParentPath();
CatalogEntity catalog = CatalogEntity.of(resolvedNamespace.getFirst());

if (base == null || !metadata.location().equals(base.location())) {
if (base == null
|| !metadata.location().equals(base.location())
|| !Objects.equal(
base.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY),
metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) {
// If location is changing then we must validate that the requested location is valid
// for the storage configuration inherited under this entity's path.
validateLocationForTableLike(tableIdentifier, metadata.location(), resolvedStorageEntity);
// also validate that the view location doesn't overlap an existing table
validateNoLocationOverlap(tableIdentifier, resolvedNamespace, metadata.location());
Set<String> dataLocations = new HashSet<>();
dataLocations.add(metadata.location());
if (metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)
!= null) {
dataLocations.add(
metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
}
if (metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)
!= null) {
dataLocations.add(
metadata
.properties()
.get(TableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
}
validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity);
// also validate that the table location doesn't overlap an existing table
dataLocations.forEach(
location -> validateNoLocationOverlap(tableIdentifier, resolvedNamespace, location));
// and that the metadata file points to a location within the table's directory structure
if (metadata.metadataFileLocation() != null) {
validateMetadataFileInTableDir(tableIdentifier, metadata, catalog);
Expand Down Expand Up @@ -1921,7 +1964,6 @@ private List<TableIdentifier> listTableLike(PolarisEntitySubType subType, Namesp
* @return FileIO object
*/
private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
blockedUserSpecifiedWriteLocation(properties);
Map<String, String> propertiesWithS3CustomizedClientFactory = new HashMap<>(properties);
propertiesWithS3CustomizedClientFactory.put(
S3FileIOProperties.CLIENT_FACTORY, PolarisS3FileIOClientFactory.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ public void testIcebergCreateTablesWithWritePathBlocked(TestInfo testInfo) throw
.withProperties(Map.of("write.data.path", "s3://my-bucket/path/to/data"))
.create())
.isInstanceOf(ForbiddenException.class)
.hasMessage(
"Forbidden: Delegate access to table with user-specified write location is temporarily not supported.");
.hasMessageContaining("Forbidden: Invalid locations");

Assertions.assertThatThrownBy(
() ->
Expand All @@ -461,8 +460,7 @@ public void testIcebergCreateTablesWithWritePathBlocked(TestInfo testInfo) throw
Map.of("write.metadata.path", "s3://my-bucket/path/to/data"))
.create())
.isInstanceOf(ForbiddenException.class)
.hasMessage(
"Forbidden: Delegate access to table with user-specified write location is temporarily not supported.");
.hasMessageContaining("Forbidden: Invalid locations");
} catch (BadRequestException e) {
LOGGER.info("Received expected exception {}", e.getMessage());
}
Expand Down
Loading