Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.LocationBasedEntity;
Expand Down Expand Up @@ -88,6 +90,7 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan
// to serve reads within the same transaction while also storing the ordered list of
// pendingUpdates that ultimately need to be applied in order within the real MetaStoreManager.
private final List<EntityWithPath> pendingUpdates = new ArrayList<>();
private final List<StageEvent> pendingEvents = new ArrayList<>();

public TransactionWorkspaceMetaStoreManager(
PolarisDiagnostics diagnostics, PolarisMetaStoreManager delegate) {
Expand All @@ -99,6 +102,10 @@ public List<EntityWithPath> getPendingUpdates() {
return ImmutableList.copyOf(pendingUpdates);
}

public List<StageEvent> getPendingEvents() {
return ImmutableList.copyOf(pendingEvents);
}

@Override
public BaseResult bootstrapPolarisService(@Nonnull PolarisCallContext callCtx) {
diagnostics.fail("illegal_method_in_transaction_workspace", "bootstrapPolarisService");
Expand Down Expand Up @@ -456,4 +463,46 @@ public void writeEvents(
@Nonnull PolarisCallContext callCtx, @Nonnull List<PolarisEvent> polarisEvents) {
diagnostics.fail("illegal_method_in_transaction_workspace", "writeEvents");
}

public void stageEvent(
String catalogName,
TableIdentifier identifier,
TableMetadata metadataBefore,
TableMetadata metadataAfter) {
pendingEvents.add(new StageEvent(catalogName, identifier, metadataBefore, metadataAfter));
}

public static class StageEvent {
private final String catalogName;
private final TableIdentifier identifier;
private final TableMetadata metadataBefore;
private final TableMetadata metadataAfter;

public StageEvent(
String catalogName,
TableIdentifier identifier,
TableMetadata metadataBefore,
TableMetadata metadataAfter) {
this.catalogName = catalogName;
this.identifier = identifier;
this.metadataBefore = metadataBefore;
this.metadataAfter = metadataAfter;
}

public String catalogName() {
return catalogName;
}

public TableIdentifier identifier() {
return identifier;
}

public TableMetadata metadataBefore() {
return metadataBefore;
}

public TableMetadata metadataAfter() {
return metadataAfter;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.persistence;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class TransactionWorkspaceMetaStoreManagerTest {

private TransactionWorkspaceMetaStoreManager transactionManager;
private PolarisDiagnostics diagnostics;
private PolarisMetaStoreManager delegate;

@BeforeEach
void setUp() {
diagnostics = mock(PolarisDiagnostics.class);
delegate = mock(PolarisMetaStoreManager.class);
transactionManager = new TransactionWorkspaceMetaStoreManager(diagnostics, delegate);
}

@Test
void testEntityUpdateFunctionality() {
PolarisCallContext callCtx = mock(PolarisCallContext.class);

// Test with normal catalog path
List<PolarisEntityCore> catalogPath = Arrays.asList(mock(PolarisEntityCore.class));
PolarisBaseEntity entity1 = mock(PolarisBaseEntity.class);

EntityResult result1 =
transactionManager.updateEntityPropertiesIfNotChanged(callCtx, catalogPath, entity1);
assertThat(result1).isNotNull();
assertThat(result1.getEntity()).isEqualTo(entity1);

// Test with null catalog path (top-level entity)
PolarisBaseEntity entity2 = mock(PolarisBaseEntity.class);
EntityResult result2 =
transactionManager.updateEntityPropertiesIfNotChanged(callCtx, null, entity2);
assertThat(result2.getEntity()).isEqualTo(entity2);

// Test with empty catalog path
List<PolarisEntityCore> emptyCatalogPath = Collections.emptyList();
PolarisBaseEntity entity3 = mock(PolarisBaseEntity.class);
EntityResult result3 =
transactionManager.updateEntityPropertiesIfNotChanged(callCtx, emptyCatalogPath, entity3);
assertThat(result3.getEntity()).isEqualTo(entity3);

// Verify all updates are staged correctly
List<EntityWithPath> pendingUpdates = transactionManager.getPendingUpdates();
assertThat(pendingUpdates).hasSize(3);

assertThat(pendingUpdates.get(0).getCatalogPath()).isEqualTo(catalogPath);
assertThat(pendingUpdates.get(0).getEntity()).isEqualTo(entity1);

assertThat(pendingUpdates.get(1).getCatalogPath()).isNull();
assertThat(pendingUpdates.get(1).getEntity()).isEqualTo(entity2);

assertThat(pendingUpdates.get(2).getCatalogPath()).isEqualTo(emptyCatalogPath);
assertThat(pendingUpdates.get(2).getEntity()).isEqualTo(entity3);
}

@Test
void testStageEventFunctionality() {
// Test one event
String catalogName = "test_catalog";
TableIdentifier identifier = TableIdentifier.of("namespace", "table");
TableMetadata metadataBefore = mock(TableMetadata.class);
TableMetadata metadataAfter = mock(TableMetadata.class);

transactionManager.stageEvent(catalogName, identifier, metadataBefore, metadataAfter);

List<TransactionWorkspaceMetaStoreManager.StageEvent> events =
transactionManager.getPendingEvents();
assertThat(events).hasSize(1);

TransactionWorkspaceMetaStoreManager.StageEvent event = events.get(0);
assertThat(event.catalogName()).isEqualTo(catalogName);
assertThat(event.identifier()).isEqualTo(identifier);
assertThat(event.metadataBefore()).isEqualTo(metadataBefore);
assertThat(event.metadataAfter()).isEqualTo(metadataAfter);

// Test another event
String catalog2 = "catalog2";
TableIdentifier id2 = TableIdentifier.of("ns2", "table2");
TableMetadata meta2Before = mock(TableMetadata.class);
TableMetadata meta2After = mock(TableMetadata.class);

transactionManager.stageEvent(catalog2, id2, meta2Before, meta2After);

events = transactionManager.getPendingEvents();
assertThat(events).hasSize(2);
assertThat(events.get(1).catalogName()).isEqualTo(catalog2);
assertThat(events.get(1).identifier()).isEqualTo(id2);
}

@Test
void testPendingUpdatesAndEventsTogether() {
PolarisCallContext callCtx = mock(PolarisCallContext.class);
PolarisBaseEntity entity = mock(PolarisBaseEntity.class);
List<PolarisEntityCore> catalogPath = Arrays.asList(mock(PolarisEntityCore.class));

String catalogName = "test_catalog";
TableIdentifier identifier = TableIdentifier.of("namespace", "table");
TableMetadata metadataBefore = mock(TableMetadata.class);
TableMetadata metadataAfter = mock(TableMetadata.class);

transactionManager.updateEntityPropertiesIfNotChanged(callCtx, catalogPath, entity);
transactionManager.stageEvent(catalogName, identifier, metadataBefore, metadataAfter);

List<EntityWithPath> pendingUpdates = transactionManager.getPendingUpdates();
List<TransactionWorkspaceMetaStoreManager.StageEvent> pendingEvents =
transactionManager.getPendingEvents();

assertThat(pendingUpdates).hasSize(1);
assertThat(pendingEvents).hasSize(1);

// Verify update
EntityWithPath update = pendingUpdates.get(0);
assertThat(update.getCatalogPath()).isEqualTo(catalogPath);
assertThat(update.getEntity()).isEqualTo(entity);

// Verify event
TransactionWorkspaceMetaStoreManager.StageEvent event = pendingEvents.get(0);
assertThat(event.catalogName()).isEqualTo(catalogName);
assertThat(event.identifier()).isEqualTo(identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.polaris.core.exceptions.CommitConflictException;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
Expand Down Expand Up @@ -1553,9 +1554,20 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
updateTableLike(tableIdentifier, entity);
}

polarisEventListener.onAfterCommitTable(
new IcebergRestCatalogEvents.AfterCommitTableEvent(
catalogName, tableIdentifier, base, metadata));
if (getMetaStoreManager() instanceof TransactionWorkspaceMetaStoreManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if + instanceof looks concerning to me. I do believe the whole events design can and should be done in a way that processing logic does not require knowing specific types of pluggable components.

TransactionalWorkspaceMetaStoreManager in itself has API design concerns from my POV, so I believe this area of the code needs refactoring in general, so adding logic that depends on specific types is both risky and complicates future refactoring.

// This operation was a staged commit and will be applied later
TransactionWorkspaceMetaStoreManager txMgr =
(TransactionWorkspaceMetaStoreManager) getMetaStoreManager();
txMgr.stageEvent(catalogName, tableIdentifier, base, metadata);

polarisEventListener.onStageCommitTable(
new IcebergRestCatalogEvents.StageCommitTableEvent(
catalogName, tableIdentifier, base, metadata));
} else {
polarisEventListener.onAfterCommitTable(
new IcebergRestCatalogEvents.AfterCommitTableEvent(
catalogName, tableIdentifier, base, metadata));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.reporting.PolarisMetricsReporter;
Expand Down Expand Up @@ -148,6 +149,7 @@ public class IcebergCatalogAdapter
private final ReservedProperties reservedProperties;
private final CatalogHandlerUtils catalogHandlerUtils;
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
private final PolarisEventListener polarisEventListener;
private final StorageAccessConfigProvider storageAccessConfigProvider;
private final PolarisMetricsReporter metricsReporter;

Expand All @@ -166,6 +168,7 @@ public IcebergCatalogAdapter(
ReservedProperties reservedProperties,
CatalogHandlerUtils catalogHandlerUtils,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories,
PolarisEventListener polarisEventListener,
StorageAccessConfigProvider storageAccessConfigProvider,
PolarisMetricsReporter metricsReporter) {
this.diagnostics = diagnostics;
Expand All @@ -182,6 +185,7 @@ public IcebergCatalogAdapter(
this.reservedProperties = reservedProperties;
this.catalogHandlerUtils = catalogHandlerUtils;
this.externalCatalogFactories = externalCatalogFactories;
this.polarisEventListener = polarisEventListener;
this.storageAccessConfigProvider = storageAccessConfigProvider;
this.metricsReporter = metricsReporter;
}
Expand Down Expand Up @@ -223,6 +227,7 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String
reservedProperties,
catalogHandlerUtils,
externalCatalogFactories,
polarisEventListener,
storageAccessConfigProvider);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider;
import org.apache.polaris.service.config.ReservedProperties;
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.events.IcebergRestCatalogEvents;
import org.apache.polaris.service.events.listeners.PolarisEventListener;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.types.NotificationRequest;
Expand Down Expand Up @@ -137,6 +139,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
private final CallContextCatalogFactory catalogFactory;
private final ReservedProperties reservedProperties;
private final CatalogHandlerUtils catalogHandlerUtils;
private final PolarisEventListener polarisEventListener;
private final StorageAccessConfigProvider storageAccessConfigProvider;

// Catalog instance will be initialized after authorizing resolver successfully resolves
Expand All @@ -161,6 +164,7 @@ public IcebergCatalogHandler(
ReservedProperties reservedProperties,
CatalogHandlerUtils catalogHandlerUtils,
Instance<ExternalCatalogFactory> externalCatalogFactories,
PolarisEventListener polarisEventListener,
StorageAccessConfigProvider storageAccessConfigProvider) {
super(
diagnostics,
Expand All @@ -175,6 +179,7 @@ public IcebergCatalogHandler(
this.catalogFactory = catalogFactory;
this.reservedProperties = reservedProperties;
this.catalogHandlerUtils = catalogHandlerUtils;
this.polarisEventListener = polarisEventListener;
this.storageAccessConfigProvider = storageAccessConfigProvider;
}

Expand Down Expand Up @@ -1056,6 +1061,16 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest)
"Transaction commit failed with status: %s, extraInfo: %s",
result.getReturnStatus(), result.getExtraInformation());
}

for (TransactionWorkspaceMetaStoreManager.StageEvent event :
transactionMetaStoreManager.getPendingEvents()) {
polarisEventListener.onAfterCommitTable(
new IcebergRestCatalogEvents.AfterCommitTableEvent(
event.catalogName(),
event.identifier(),
event.metadataBefore(),
event.metadataAfter()));
}
}

public ListTablesResponse listViews(Namespace namespace, String pageToken, Integer pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ public record BeforeCommitTableEvent(
TableMetadata metadataAfter)
implements PolarisEvent {}

public record StageCommitTableEvent(
String catalogName,
TableIdentifier identifier,
TableMetadata metadataBefore,
TableMetadata metadataAfter)
implements PolarisEvent {}

public record AfterCommitTableEvent(
String catalogName,
TableIdentifier identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ default void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent e

default void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {}

default void onStageCommitTable(IcebergRestCatalogEvents.StageCommitTableEvent event) {}

default void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {}

default void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,12 @@ public void testEventsAreEmitted() {
Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE);
Assertions.assertThat(afterTableEvent.metadataBefore().properties().get(key)).isEqualTo(valOld);
Assertions.assertThat(afterTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);

Assertions.assertThatThrownBy(
() ->
testPolarisEventListener.getLatest(
IcebergRestCatalogEvents.StageCommitTableEvent.class))
.isInstanceOf(IllegalStateException.class);
}

private static PageToken nextRequest(Page<?> previousPage) {
Expand Down
Loading