From 6c679950ec99759ab87e9b1b949054e1754e4380 Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Tue, 18 Nov 2025 01:55:05 -0800 Subject: [PATCH 1/2] Revert "Remove unused polarisEventListener field from IcebergCatalogHandler (#3045)" This reverts commit 7071543b5a81f78891e5c78172aeee13a77e4b28. --- .../service/catalog/iceberg/IcebergCatalogAdapter.java | 5 +++++ .../service/catalog/iceberg/IcebergCatalogHandler.java | 4 ++++ .../catalog/iceberg/IcebergCatalogHandlerAuthzTest.java | 4 ++++ .../IcebergCatalogHandlerFineGrainedDisabledTest.java | 1 + .../java/org/apache/polaris/service/TestServices.java | 1 + 5 files changed, 15 insertions(+) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 1101ccbee5..352d1a81cc 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -84,6 +84,7 @@ import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; import org.apache.polaris.service.reporting.PolarisMetricsReporter; @@ -148,6 +149,7 @@ public class IcebergCatalogAdapter private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; private final Instance externalCatalogFactories; + private final PolarisEventListener polarisEventListener; private final StorageAccessConfigProvider storageAccessConfigProvider; private final PolarisMetricsReporter metricsReporter; @@ -166,6 +168,7 @@ public IcebergCatalogAdapter( ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, @Any Instance externalCatalogFactories, + PolarisEventListener polarisEventListener, StorageAccessConfigProvider storageAccessConfigProvider, PolarisMetricsReporter metricsReporter) { this.diagnostics = diagnostics; @@ -182,6 +185,7 @@ public IcebergCatalogAdapter( this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; this.externalCatalogFactories = externalCatalogFactories; + this.polarisEventListener = polarisEventListener; this.storageAccessConfigProvider = storageAccessConfigProvider; this.metricsReporter = metricsReporter; } @@ -223,6 +227,7 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String reservedProperties, catalogHandlerUtils, externalCatalogFactories, + polarisEventListener, storageAccessConfigProvider); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 4f4ab0d1dc..d2e48d2cc0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -109,6 +109,7 @@ import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; import org.apache.polaris.service.types.NotificationRequest; @@ -137,6 +138,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final CallContextCatalogFactory catalogFactory; private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final PolarisEventListener polarisEventListener; private final StorageAccessConfigProvider storageAccessConfigProvider; // Catalog instance will be initialized after authorizing resolver successfully resolves @@ -161,6 +163,7 @@ public IcebergCatalogHandler( ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, Instance externalCatalogFactories, + PolarisEventListener polarisEventListener, StorageAccessConfigProvider storageAccessConfigProvider) { super( diagnostics, @@ -175,6 +178,7 @@ public IcebergCatalogHandler( this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.polarisEventListener = polarisEventListener; this.storageAccessConfigProvider = storageAccessConfigProvider; } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java index 0370575215..34118d6a71 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerAuthzTest.java @@ -133,6 +133,7 @@ private IcebergCatalogHandler newWrapper( reservedProperties, catalogHandlerUtils, emptyExternalCatalogFactory(), + polarisEventListener, storageAccessConfigProvider); } @@ -272,6 +273,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() { reservedProperties, catalogHandlerUtils, emptyExternalCatalogFactory(), + polarisEventListener, storageAccessConfigProvider); // a variety of actions are all disallowed because the principal's credentials must be rotated @@ -309,6 +311,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() { reservedProperties, catalogHandlerUtils, emptyExternalCatalogFactory(), + polarisEventListener, storageAccessConfigProvider); doTestSufficientPrivilegeSets( @@ -1185,6 +1188,7 @@ public T getConfig(PolarisConfiguration config, CatalogEntity catalogEnti reservedProperties, catalogHandlerUtils, emptyExternalCatalogFactory(), + polarisEventListener, storageAccessConfigProvider); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java index cc59fb0880..0b4fcc910e 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFineGrainedDisabledTest.java @@ -70,6 +70,7 @@ private IcebergCatalogHandler newWrapper() { reservedProperties, catalogHandlerUtils, emptyExternalCatalogFactory(), + polarisEventListener, storageAccessConfigProvider); } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 3d40493f8a..4c910ffbd6 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -319,6 +319,7 @@ public String getAuthenticationScheme() { reservedProperties, catalogHandlerUtils, externalCatalogFactory, + polarisEventListener, storageAccessConfigProvider, new DefaultMetricsReporter()); From b4ce2709488d725af18edc83157f5fa1bd8d303b Mon Sep 17 00:00:00 2001 From: adnanhemani Date: Tue, 18 Nov 2025 01:54:42 -0800 Subject: [PATCH 2/2] Fixes phantom event emission; Introduces StageCommitTableEvent --- .../TransactionWorkspaceMetaStoreManager.java | 49 ++++ ...nsactionWorkspaceMetaStoreManagerTest.java | 156 +++++++++++ .../catalog/iceberg/IcebergCatalog.java | 18 +- .../iceberg/IcebergCatalogHandler.java | 11 + .../events/IcebergRestCatalogEvents.java | 7 + .../listeners/PolarisEventListener.java | 2 + .../iceberg/AbstractIcebergCatalogTest.java | 6 + .../iceberg/CommitTransactionEventTest.java | 265 ++++++++++++++++++ .../apache/polaris/service/TestServices.java | 14 +- .../listeners/TestPolarisEventListener.java | 7 +- 10 files changed, 528 insertions(+), 7 deletions(-) create mode 100644 polaris-core/src/test/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManagerTest.java create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index b0c78c0b13..a8e7af36c9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.entity.LocationBasedEntity; @@ -88,6 +90,7 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan // to serve reads within the same transaction while also storing the ordered list of // pendingUpdates that ultimately need to be applied in order within the real MetaStoreManager. private final List pendingUpdates = new ArrayList<>(); + private final List pendingEvents = new ArrayList<>(); public TransactionWorkspaceMetaStoreManager( PolarisDiagnostics diagnostics, PolarisMetaStoreManager delegate) { @@ -99,6 +102,10 @@ public List getPendingUpdates() { return ImmutableList.copyOf(pendingUpdates); } + public List getPendingEvents() { + return ImmutableList.copyOf(pendingEvents); + } + @Override public BaseResult bootstrapPolarisService(@Nonnull PolarisCallContext callCtx) { diagnostics.fail("illegal_method_in_transaction_workspace", "bootstrapPolarisService"); @@ -456,4 +463,46 @@ public void writeEvents( @Nonnull PolarisCallContext callCtx, @Nonnull List polarisEvents) { diagnostics.fail("illegal_method_in_transaction_workspace", "writeEvents"); } + + public void stageEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata metadataBefore, + TableMetadata metadataAfter) { + pendingEvents.add(new StageEvent(catalogName, identifier, metadataBefore, metadataAfter)); + } + + public static class StageEvent { + private final String catalogName; + private final TableIdentifier identifier; + private final TableMetadata metadataBefore; + private final TableMetadata metadataAfter; + + public StageEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata metadataBefore, + TableMetadata metadataAfter) { + this.catalogName = catalogName; + this.identifier = identifier; + this.metadataBefore = metadataBefore; + this.metadataAfter = metadataAfter; + } + + public String catalogName() { + return catalogName; + } + + public TableIdentifier identifier() { + return identifier; + } + + public TableMetadata metadataBefore() { + return metadataBefore; + } + + public TableMetadata metadataAfter() { + return metadataAfter; + } + } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManagerTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManagerTest.java new file mode 100644 index 0000000000..421f7ca48c --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManagerTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityCore; +import org.apache.polaris.core.persistence.dao.entity.EntityResult; +import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TransactionWorkspaceMetaStoreManagerTest { + + private TransactionWorkspaceMetaStoreManager transactionManager; + private PolarisDiagnostics diagnostics; + private PolarisMetaStoreManager delegate; + + @BeforeEach + void setUp() { + diagnostics = mock(PolarisDiagnostics.class); + delegate = mock(PolarisMetaStoreManager.class); + transactionManager = new TransactionWorkspaceMetaStoreManager(diagnostics, delegate); + } + + @Test + void testEntityUpdateFunctionality() { + PolarisCallContext callCtx = mock(PolarisCallContext.class); + + // Test with normal catalog path + List catalogPath = Arrays.asList(mock(PolarisEntityCore.class)); + PolarisBaseEntity entity1 = mock(PolarisBaseEntity.class); + + EntityResult result1 = + transactionManager.updateEntityPropertiesIfNotChanged(callCtx, catalogPath, entity1); + assertThat(result1).isNotNull(); + assertThat(result1.getEntity()).isEqualTo(entity1); + + // Test with null catalog path (top-level entity) + PolarisBaseEntity entity2 = mock(PolarisBaseEntity.class); + EntityResult result2 = + transactionManager.updateEntityPropertiesIfNotChanged(callCtx, null, entity2); + assertThat(result2.getEntity()).isEqualTo(entity2); + + // Test with empty catalog path + List emptyCatalogPath = Collections.emptyList(); + PolarisBaseEntity entity3 = mock(PolarisBaseEntity.class); + EntityResult result3 = + transactionManager.updateEntityPropertiesIfNotChanged(callCtx, emptyCatalogPath, entity3); + assertThat(result3.getEntity()).isEqualTo(entity3); + + // Verify all updates are staged correctly + List pendingUpdates = transactionManager.getPendingUpdates(); + assertThat(pendingUpdates).hasSize(3); + + assertThat(pendingUpdates.get(0).getCatalogPath()).isEqualTo(catalogPath); + assertThat(pendingUpdates.get(0).getEntity()).isEqualTo(entity1); + + assertThat(pendingUpdates.get(1).getCatalogPath()).isNull(); + assertThat(pendingUpdates.get(1).getEntity()).isEqualTo(entity2); + + assertThat(pendingUpdates.get(2).getCatalogPath()).isEqualTo(emptyCatalogPath); + assertThat(pendingUpdates.get(2).getEntity()).isEqualTo(entity3); + } + + @Test + void testStageEventFunctionality() { + // Test one event + String catalogName = "test_catalog"; + TableIdentifier identifier = TableIdentifier.of("namespace", "table"); + TableMetadata metadataBefore = mock(TableMetadata.class); + TableMetadata metadataAfter = mock(TableMetadata.class); + + transactionManager.stageEvent(catalogName, identifier, metadataBefore, metadataAfter); + + List events = + transactionManager.getPendingEvents(); + assertThat(events).hasSize(1); + + TransactionWorkspaceMetaStoreManager.StageEvent event = events.get(0); + assertThat(event.catalogName()).isEqualTo(catalogName); + assertThat(event.identifier()).isEqualTo(identifier); + assertThat(event.metadataBefore()).isEqualTo(metadataBefore); + assertThat(event.metadataAfter()).isEqualTo(metadataAfter); + + // Test another event + String catalog2 = "catalog2"; + TableIdentifier id2 = TableIdentifier.of("ns2", "table2"); + TableMetadata meta2Before = mock(TableMetadata.class); + TableMetadata meta2After = mock(TableMetadata.class); + + transactionManager.stageEvent(catalog2, id2, meta2Before, meta2After); + + events = transactionManager.getPendingEvents(); + assertThat(events).hasSize(2); + assertThat(events.get(1).catalogName()).isEqualTo(catalog2); + assertThat(events.get(1).identifier()).isEqualTo(id2); + } + + @Test + void testPendingUpdatesAndEventsTogether() { + PolarisCallContext callCtx = mock(PolarisCallContext.class); + PolarisBaseEntity entity = mock(PolarisBaseEntity.class); + List catalogPath = Arrays.asList(mock(PolarisEntityCore.class)); + + String catalogName = "test_catalog"; + TableIdentifier identifier = TableIdentifier.of("namespace", "table"); + TableMetadata metadataBefore = mock(TableMetadata.class); + TableMetadata metadataAfter = mock(TableMetadata.class); + + transactionManager.updateEntityPropertiesIfNotChanged(callCtx, catalogPath, entity); + transactionManager.stageEvent(catalogName, identifier, metadataBefore, metadataAfter); + + List pendingUpdates = transactionManager.getPendingUpdates(); + List pendingEvents = + transactionManager.getPendingEvents(); + + assertThat(pendingUpdates).hasSize(1); + assertThat(pendingEvents).hasSize(1); + + // Verify update + EntityWithPath update = pendingUpdates.get(0); + assertThat(update.getCatalogPath()).isEqualTo(catalogPath); + assertThat(update.getEntity()).isEqualTo(entity); + + // Verify event + TransactionWorkspaceMetaStoreManager.StageEvent event = pendingEvents.get(0); + assertThat(event.catalogName()).isEqualTo(catalogName); + assertThat(event.identifier()).isEqualTo(identifier); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index e8f5402b1b..3a9b62bc75 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -108,6 +108,7 @@ import org.apache.polaris.core.exceptions.CommitConflictException; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; @@ -1553,9 +1554,20 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { updateTableLike(tableIdentifier, entity); } - polarisEventListener.onAfterCommitTable( - new IcebergRestCatalogEvents.AfterCommitTableEvent( - catalogName, tableIdentifier, base, metadata)); + if (getMetaStoreManager() instanceof TransactionWorkspaceMetaStoreManager) { + // This operation was a staged commit and will be applied later + TransactionWorkspaceMetaStoreManager txMgr = + (TransactionWorkspaceMetaStoreManager) getMetaStoreManager(); + txMgr.stageEvent(catalogName, tableIdentifier, base, metadata); + + polarisEventListener.onStageCommitTable( + new IcebergRestCatalogEvents.StageCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); + } else { + polarisEventListener.onAfterCommitTable( + new IcebergRestCatalogEvents.AfterCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); + } } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index d2e48d2cc0..f922c804fb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -109,6 +109,7 @@ import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; @@ -1060,6 +1061,16 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest) "Transaction commit failed with status: %s, extraInfo: %s", result.getReturnStatus(), result.getExtraInformation()); } + + for (TransactionWorkspaceMetaStoreManager.StageEvent event : + transactionMetaStoreManager.getPendingEvents()) { + polarisEventListener.onAfterCommitTable( + new IcebergRestCatalogEvents.AfterCommitTableEvent( + event.catalogName(), + event.identifier(), + event.metadataBefore(), + event.metadataAfter())); + } } public ListTablesResponse listViews(Namespace namespace, String pageToken, Integer pageSize) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java index 977b8ea7a5..ba8a095e42 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -270,6 +270,13 @@ public record BeforeCommitTableEvent( TableMetadata metadataAfter) implements PolarisEvent {} + public record StageCommitTableEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata metadataBefore, + TableMetadata metadataAfter) + implements PolarisEvent {} + public record AfterCommitTableEvent( String catalogName, TableIdentifier identifier, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java index 113dcd0d91..bc046eee2a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java @@ -257,6 +257,8 @@ default void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent e default void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {} + default void onStageCommitTable(IcebergRestCatalogEvents.StageCommitTableEvent event) {} + default void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {} default void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index dd7d7609f4..6aae79f680 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -2383,6 +2383,12 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); Assertions.assertThat(afterTableEvent.metadataBefore().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew); + + Assertions.assertThatThrownBy( + () -> + testPolarisEventListener.getLatest( + IcebergRestCatalogEvents.StageCommitTableEvent.class)) + .isInstanceOf(IllegalStateException.class); } private static PageToken nextRequest(Page previousPage) { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java new file mode 100644 index 0000000000..ba5189bedb --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.catalog.iceberg; + +import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import jakarta.ws.rs.core.Response; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.CreateCatalogRequest; +import org.apache.polaris.core.admin.model.FileStorageConfigInfo; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class CommitTransactionEventTest { + private static final String namespace = "ns"; + private static final String catalog = "test-catalog"; + private static final String propertyName = "custom-property-1"; + + private String catalogLocation; + + @BeforeEach + public void setUp(@TempDir Path tempDir) { + catalogLocation = tempDir.toAbsolutePath().toUri().toString(); + if (catalogLocation.endsWith("/")) { + catalogLocation = catalogLocation.substring(0, catalogLocation.length() - 1); + } + } + + @Test + void testEventsForSuccessfulTransaction() { + TestServices testServices = createTestServices(false); + createCatalogAndNamespace(testServices, Map.of(), catalogLocation); + + executeTransactionTest("test-table-1", "test-table-2", testServices); + + // Verify that all (Before/After/Stage)CommitTable events were emitted + TestPolarisEventListener testEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class) + .identifier() + .name()) + .isEqualTo("test-table-2"); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.StageCommitTableEvent.class) + .identifier() + .name()) + .isEqualTo("test-table-2"); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class) + .identifier() + .name()) + .isEqualTo("test-table-2"); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class) + .metadataAfter() + .properties()) + .containsKey(propertyName); + } + + @Test + void testEventsForUnSuccessfulTransaction() { + TestServices testServices = createTestServices(true); + createCatalogAndNamespace(testServices, Map.of(), catalogLocation); + + executeTransactionTest("test-table-3", "test-table-4", testServices); + + // Verify that all (Before/Stage)CommitTable events were emitted + TestPolarisEventListener testEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class) + .identifier() + .name()) + .isEqualTo("test-table-4"); + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.StageCommitTableEvent.class) + .identifier() + .name()) + .isEqualTo("test-table-4"); + + // Verify that the AfterCommitTable events that were emitted were for the earlier create table + // call and therefore, we did not emit an AfterCommitTableEvent for the failed transaction + assertThat( + testEventListener + .getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class) + .metadataBefore()) + .isNull(); + } + + private void createCatalogAndNamespace( + TestServices services, Map catalogConfig, String catalogLocation) { + CatalogProperties.Builder propertiesBuilder = + CatalogProperties.builder() + .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, catalog)) + .putAll(catalogConfig); + + StorageConfigInfo config = + FileStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) + .build(); + Catalog catalogObject = + new Catalog( + Catalog.TypeEnum.INTERNAL, catalog, propertiesBuilder.build(), 0L, 0L, 1, config); + try (Response response = + services + .catalogsApi() + .createCatalog( + new CreateCatalogRequest(catalogObject), + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + + CreateNamespaceRequest createNamespaceRequest = + CreateNamespaceRequest.builder().withNamespace(Namespace.of(namespace)).build(); + try (Response response = + services + .restApi() + .createNamespace( + catalog, + createNamespaceRequest, + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + } + } + + private void createTable(TestServices services, String tableName, String baseLocation) { + CreateTableRequest createTableRequest = + CreateTableRequest.builder() + .withName(tableName) + .withLocation(String.format("%s/%s/%s/%s", baseLocation, catalog, namespace, tableName)) + .withSchema(SCHEMA) + .build(); + services + .restApi() + .createTable( + catalog, + namespace, + createTableRequest, + null, + services.realmContext(), + services.securityContext()); + } + + /** + * Creates TestServices with optional hijacking for failure simulation. + * + * @param shouldFail if true, creates services that will fail during transaction commit + */ + private TestServices createTestServices(boolean shouldFail) { + Map config = + Map.of( + "ALLOW_INSECURE_STORAGE_TYPES", + "true", + "SUPPORTED_CATALOG_STORAGE_TYPES", + List.of("FILE")); + TestServices testServices = TestServices.builder().config(config).build(); + if (!shouldFail) { + return testServices; + } + + // Create a spy on the existing testServices' metastore manager for failure simulation + PolarisMetaStoreManager spyMetaStoreManager = spy(testServices.metaStoreManager()); + + // Return an error when trying to apply the updates on the spy + doReturn(new EntitiesResult(BaseResult.ReturnStatus.ENTITY_NOT_FOUND, "")) + .when(spyMetaStoreManager) + .updateEntitiesPropertiesIfNotChanged(any(), any()); + + MetaStoreManagerFactory metaStoreManagerFactorySpy = + spy(testServices.metaStoreManagerFactory()); + doReturn(spyMetaStoreManager) + .when(metaStoreManagerFactorySpy) + .getOrCreateMetaStoreManager(any()); + + return TestServices.builder() + .metaStoreManagerFactory(metaStoreManagerFactorySpy) + .config(config) + .build(); + } + + /** + * Executes a transaction test with the specified parameters. + * + * @param table1Name name of the first table + * @param table2Name name of the second table + * @param testServices TestServices object that will be operated on + */ + private void executeTransactionTest( + String table1Name, String table2Name, TestServices testServices) { + // Setup the test tables + createTable(testServices, table1Name, catalogLocation); + createTable(testServices, table2Name, catalogLocation); + + CommitTransactionRequest commitRequest = + new CommitTransactionRequest( + List.of( + UpdateTableRequest.create( + TableIdentifier.of(namespace, table1Name), + List.of(), + List.of(new MetadataUpdate.SetProperties(Map.of(propertyName, "value1")))), + UpdateTableRequest.create( + TableIdentifier.of(namespace, table2Name), + List.of(), + List.of(new MetadataUpdate.SetProperties(Map.of(propertyName, "value2")))))); + + // Ignore any errors that occur during transaction commit + try { + testServices + .restApi() + .commitTransaction( + catalog, commitRequest, testServices.realmContext(), testServices.securityContext()); + } catch (Exception ignored) { + } + } +} diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 4c910ffbd6..391204a352 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -144,6 +144,7 @@ public static class Builder { private Map config = Map.of(); private StsClient stsClient; private Supplier fileIOFactorySupplier = MeasuredFileIOFactory::new; + private MetaStoreManagerFactory metaStoreManagerFactory = null; private Builder() { stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS); @@ -178,6 +179,11 @@ public Builder stsClient(StsClient stsClient) { return this; } + public Builder metaStoreManagerFactory(MetaStoreManagerFactory metaStoreManagerFactory) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + return this; + } + public TestServices build() { PolarisConfigurationStore configurationStore = new MockedConfigurationStore(config); PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class); @@ -188,9 +194,11 @@ public TestServices build() { (destination) -> stsClient, Optional.empty(), () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN, new Date()))); - InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory = - new InMemoryPolarisMetaStoreManagerFactory( - clock, diagnostics, storageIntegrationProvider); + MetaStoreManagerFactory metaStoreManagerFactory = + this.metaStoreManagerFactory == null + ? new InMemoryPolarisMetaStoreManagerFactory( + clock, diagnostics, storageIntegrationProvider) + : this.metaStoreManagerFactory; StorageCredentialCacheConfig storageCredentialCacheConfig = () -> 10_000; StorageCredentialCache storageCredentialCache = diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index a10abcbb1c..6b228fbdbe 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -31,7 +31,7 @@ import org.apache.polaris.service.events.PrincipalRolesServiceEvents; import org.apache.polaris.service.events.PrincipalsServiceEvents; -/** Test event listener that stores all emitted events forever. */ +/** Test event listener that stores the last event of each type. */ public class TestPolarisEventListener implements PolarisEventListener { private final Map, PolarisEvent> latestEvents = new ConcurrentHashMap<>(); @@ -496,6 +496,11 @@ public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent recordEvent(event); } + @Override + public void onStageCommitTable(IcebergRestCatalogEvents.StageCommitTableEvent event) { + recordEvent(event); + } + @Override public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) { recordEvent(event);