Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.exceptions.UnprocessableEntityException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -508,16 +509,21 @@ private void createNamespaceInternal(
BehaviorChangeConfiguration.ALLOW_NAMESPACE_CUSTOM_LOCATION, catalogEntity)) {
validateNamespaceLocation(entity, resolvedParent);
}
PolarisEntity returnedEntity =
PolarisEntity.of(
getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
entity));
if (returnedEntity == null) {
throw new AlreadyExistsException(
"Cannot create namespace %s. Namespace already exists", namespace);
EntityResult result =
getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
entity);
if (!result.isSuccess()) {
if (result.alreadyExists()) {
throw new AlreadyExistsException(
"Cannot create namespace %s. Namespace already exists", namespace);
} else {
throw new ServiceFailureException(
"Unexpected error trying to create namespace %s. Status: %s ExtraInfo: %s",
namespace, result.getReturnStatus(), result.getExtraInformation());
}
}
}

Expand Down Expand Up @@ -2579,7 +2585,23 @@ private void createNonExistingNamespaces(Namespace namespace) {
Namespace parentNamespace = PolarisCatalogHelpers.getParentNamespace(nsLevel);
PolarisResolvedPathWrapper resolvedParent =
resolvedEntityView.getPassthroughResolvedPath(parentNamespace);
createNamespaceInternal(nsLevel, Collections.emptyMap(), resolvedParent);
try {
createNamespaceInternal(nsLevel, Collections.emptyMap(), resolvedParent);
} catch (AlreadyExistsException aee) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we throw AlreadyExistsException if the result is not success without checking whether the return status is ENTITY_ALREADY_EXIST. Despite ENTITY_ALREADY_EXIST is the only error status thrown in existing metastore manager impl, furture development and other implementation may include other error status like UNEXPECTED_ERROR, where we do not want to ignore and log here.

I think we could add an additional check

if (returnedEntity == null) {
throw new AlreadyExistsException(
"Cannot create namespace %s. Namespace already exists", namespace);
}

to only throw AlreadyExistsException when return status is that, otherwise throw a RuntimeException/IllegalState/...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updated to throw ServiceFailureException if returnStatus isn't ENTITY_ALREADY_EXISTS.

// Since we only attempted to create the namespace after checking that
// getPassthroughResolvedPath for this level is null, this should be a relatively
// infrequent case during high concurrency where another notification already
// conveniently created the namespace between the time we checked and the time
// we attempted to fill it in. It's working as intended in this case to simply
// continue with the existing namespace, but the fact that this collision occurred
// may be relevant to someone running the service in case of unexpected interactions,
// so we'll still log the fact that this happened.
LOGGER
.atInfo()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why INFO? If we expect it to be a normal and expected call path, "informing" log readers every time it happens seems to be an overkill from the operations perspective. I propose debug (non-blocking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was on the fence. It's one of those cases where it's "normal" as something that will happen time-to-time, but usually isn't deterministically triggered since here the attempt to create the namespace is only made if the immediate preceding call thinks the namespace is missing. So it would be fairly infrequent, only correlating with periods of high parent-creation contention (not just normal periods of high concurrency on an already-existing namespace).

I had even started atWarn but figured that's too alarming, so downgraded to info.

My thought is that if there are unforeseen bugs relating to the state of these "create if needed" namespaces they're more likely to be related to these collisions than to normal operation, so even if someone sets their log-viewer to exclude DEBUG level it'd be easy to eyeball a correlation between this log statement and whatever other buggy behavior comes up.

I also don't feel too strongly though so I'm happy to downgrade it if there's a preference for DEBUG despite this rationale .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. No need to change :)

.setCause(aee)
.addKeyValue("namespace", namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it supposed to be referenced in the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I actually normally default to the traditional format-string with curly-braces, but I think others have been pushing for the more modern structured logging convention so I'm been forcing myself to use it :)

Structured logging is indeed more beneficial for cases where logs sinks are good at having structured-log elements pulled out into separate semi-structured fields for efficient aggregations, etc.

The convention as I understand it is to make the main log message avoid any direct mention of the structured params: https://www.slf4j.org/manual.html

// using classical API
logger.debug("oldT={} newT={} Temperature changed.", oldT, newT);

// using fluent API 
logger.atDebug().setMessage("Temperature changed.").addKeyValue("oldT", oldT).addKeyValue("newT", newT).log();

As opposed to addArgument which is for the traditional inline string-replacement into the human-friendly string, the addKeyvalue is standalone, and usually log outputters still show the addKeyValue fields conveniently in the log message, but for machine readers they are independent from the natural-language message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added a comment above the log statement better elucidating the considerations

.log("Namespace already exists in createNonExistingNamespace");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.Fail.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.exceptions.CommitConflictException;
Expand Down Expand Up @@ -1059,6 +1061,72 @@ public void testUpdateNotificationWhenTableAndNamespacesDontExist() {
.isTrue();
}

@Test
public void testUpdateNotificationWhenTableAndNamespacesDontExistNamespaceRaceCondition() {
Assumptions.assumeTrue(
requiresNamespaceCreate(),
"Only applicable if namespaces must be created before adding children");
Assumptions.assumeTrue(
supportsNestedNamespaces(), "Only applicable if nested namespaces are supported");
Assumptions.assumeTrue(
supportsNotifications(), "Only applicable if notifications are supported");

final String tableLocation = "s3://externally-owned-bucket/table/";
final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json";

// Use a spy so we can inject a concurrency error
PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager);
IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, spyMetaStore);
catalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

Namespace namespace = Namespace.of("parent", "child1");
TableIdentifier table = TableIdentifier.of(namespace, "table");

NotificationRequest request = new NotificationRequest();
request.setNotificationType(NotificationType.UPDATE);
TableUpdateNotification update = new TableUpdateNotification();
update.setMetadataLocation(tableMetadataLocation);
update.setTableName(table.name());
update.setTableUuid(UUID.randomUUID().toString());
update.setTimestamp(230950845L);
request.setPayload(update);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));

// Always invoke the real underlying method, but if it's a namespace we'll return
// ENTITY_ALREADY_EXISTS after doing so to simulate a different concurrent caller having
// been the one to succeed creating the namespace first.
doAnswer(
invocation -> {
PolarisEntity entity = (PolarisEntity) invocation.getArgument(2);
EntityResult result = (EntityResult) invocation.callRealMethod();
if (entity.getType() == PolarisEntityType.NAMESPACE) {
return new EntityResult(
BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS,
PolarisEntitySubType.NULL_SUBTYPE.getCode());
} else {
return result;
}
})
.when(spyMetaStore)
.createEntityIfNotExists(any(), any(), any());

Assertions.assertThat(catalog.sendNotification(table, request))
.as("Notification should be sent successfully")
.isTrue();
Assertions.assertThat(catalog.namespaceExists(namespace))
.as("Intermediate namespaces should be created")
.isTrue();
Assertions.assertThat(catalog.tableExists(table))
.as("Table should be created on receiving notification")
.isTrue();
}

@Test
public void testUpdateNotificationCreateTableInDisallowedLocation() {
Assumptions.assumeTrue(
Expand Down