From b4fd19ced4fb3786f9f4f83d295e2195351a0e0f Mon Sep 17 00:00:00 2001 From: Dennis Huo Date: Fri, 26 Sep 2025 05:00:21 +0000 Subject: [PATCH 1/2] Fix a race condition in sendNotification where concurrent parent-namespace creation causes failures The semantics of the createNonExistingNamespaces method used during sendNotification were supposed to be "create if needed". However, the behavior ended up surfacing an AlreadyExistsException if multiple concurrent sendNotification attempts were made for a brand-new namespace (where the notifications may be different tables). This would cause a table sync to fail if a sibling table was being synced at the same time, even though the new table should successfully get created under the shared namespace. --- .../catalog/iceberg/IcebergCatalog.java | 11 ++- .../iceberg/AbstractIcebergCatalogTest.java | 68 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index ac5d86524f..0e1a61d641 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -2579,7 +2579,16 @@ private void createNonExistingNamespaces(Namespace namespace) { Namespace parentNamespace = PolarisCatalogHelpers.getParentNamespace(nsLevel); PolarisResolvedPathWrapper resolvedParent = resolvedEntityView.getPassthroughResolvedPath(parentNamespace); - createNamespaceInternal(nsLevel, Collections.emptyMap(), resolvedParent); + try { + createNamespaceInternal(nsLevel, Collections.emptyMap(), resolvedParent); + } catch (AlreadyExistsException aee) { + LOGGER + .atInfo() + .setCause(aee) + .addKeyValue("namespace", namespace) + .log( + "Namespace already exists in createNonExistingNamespace; possible race condition"); + } } } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 3d66c4858c..1c9a43898c 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Fail.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -105,6 +106,7 @@ import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.exceptions.CommitConflictException; @@ -1059,6 +1061,72 @@ public void testUpdateNotificationWhenTableAndNamespacesDontExist() { .isTrue(); } + @Test + public void testUpdateNotificationWhenTableAndNamespacesDontExistNamespaceRaceCondition() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + Assumptions.assumeTrue( + supportsNestedNamespaces(), "Only applicable if nested namespaces are supported"); + Assumptions.assumeTrue( + supportsNotifications(), "Only applicable if notifications are supported"); + + final String tableLocation = "s3://externally-owned-bucket/table/"; + final String tableMetadataLocation = tableLocation + "metadata/v1.metadata.json"; + + // Use a spy so we can inject a concurrency error + PolarisMetaStoreManager spyMetaStore = spy(metaStoreManager); + IcebergCatalog catalog = newIcebergCatalog(CATALOG_NAME, spyMetaStore); + catalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + Namespace namespace = Namespace.of("parent", "child1"); + TableIdentifier table = TableIdentifier.of(namespace, "table"); + + NotificationRequest request = new NotificationRequest(); + request.setNotificationType(NotificationType.UPDATE); + TableUpdateNotification update = new TableUpdateNotification(); + update.setMetadataLocation(tableMetadataLocation); + update.setTableName(table.name()); + update.setTableUuid(UUID.randomUUID().toString()); + update.setTimestamp(230950845L); + request.setPayload(update); + + fileIO.addFile( + tableMetadataLocation, + TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8)); + + // Always invoke the real underlying method, but if it's a namespace we'll return + // ENTITY_ALREADY_EXISTS after doing so to simulate a different concurrent caller having + // been the one to succeed creating the namespace first. + doAnswer( + invocation -> { + PolarisEntity entity = (PolarisEntity) invocation.getArgument(2); + EntityResult result = (EntityResult) invocation.callRealMethod(); + if (entity.getType() == PolarisEntityType.NAMESPACE) { + return new EntityResult( + BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS, + PolarisEntitySubType.NULL_SUBTYPE.getCode()); + } else { + return result; + } + }) + .when(spyMetaStore) + .createEntityIfNotExists(any(), any(), any()); + + Assertions.assertThat(catalog.sendNotification(table, request)) + .as("Notification should be sent successfully") + .isTrue(); + Assertions.assertThat(catalog.namespaceExists(namespace)) + .as("Intermediate namespaces should be created") + .isTrue(); + Assertions.assertThat(catalog.tableExists(table)) + .as("Table should be created on receiving notification") + .isTrue(); + } + @Test public void testUpdateNotificationCreateTableInDisallowedLocation() { Assumptions.assumeTrue( From 35579fcf1bb3ad5bba8ec000b5b1a100e1807bd1 Mon Sep 17 00:00:00 2001 From: Dennis Huo Date: Fri, 26 Sep 2025 20:52:58 +0000 Subject: [PATCH 2/2] Also better future-proof the createNamespaceInternal logic by explicitly checking for ENTITY_ALREADY_EXISTS, per review suggestion. Log a less scary message since it's not an error scenario type of race condition, per review suggestion --- .../catalog/iceberg/IcebergCatalog.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 0e1a61d641..8d237fb9e2 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -71,6 +71,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.ServiceFailureException; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; @@ -508,16 +509,21 @@ private void createNamespaceInternal( BehaviorChangeConfiguration.ALLOW_NAMESPACE_CUSTOM_LOCATION, catalogEntity)) { validateNamespaceLocation(entity, resolvedParent); } - PolarisEntity returnedEntity = - PolarisEntity.of( - getMetaStoreManager() - .createEntityIfNotExists( - getCurrentPolarisContext(), - PolarisEntity.toCoreList(resolvedParent.getRawFullPath()), - entity)); - if (returnedEntity == null) { - throw new AlreadyExistsException( - "Cannot create namespace %s. Namespace already exists", namespace); + EntityResult result = + getMetaStoreManager() + .createEntityIfNotExists( + getCurrentPolarisContext(), + PolarisEntity.toCoreList(resolvedParent.getRawFullPath()), + entity); + if (!result.isSuccess()) { + if (result.alreadyExists()) { + throw new AlreadyExistsException( + "Cannot create namespace %s. Namespace already exists", namespace); + } else { + throw new ServiceFailureException( + "Unexpected error trying to create namespace %s. Status: %s ExtraInfo: %s", + namespace, result.getReturnStatus(), result.getExtraInformation()); + } } } @@ -2582,12 +2588,19 @@ private void createNonExistingNamespaces(Namespace namespace) { try { createNamespaceInternal(nsLevel, Collections.emptyMap(), resolvedParent); } catch (AlreadyExistsException aee) { + // Since we only attempted to create the namespace after checking that + // getPassthroughResolvedPath for this level is null, this should be a relatively + // infrequent case during high concurrency where another notification already + // conveniently created the namespace between the time we checked and the time + // we attempted to fill it in. It's working as intended in this case to simply + // continue with the existing namespace, but the fact that this collision occurred + // may be relevant to someone running the service in case of unexpected interactions, + // so we'll still log the fact that this happened. LOGGER .atInfo() .setCause(aee) .addKeyValue("namespace", namespace) - .log( - "Namespace already exists in createNonExistingNamespace; possible race condition"); + .log("Namespace already exists in createNonExistingNamespace"); } } }