Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

- The EclipseLink Persistence implementation has been completely removed.
- The default request ID header name has changed from `Polaris-Request-Id` to `X-Request-ID`.
- The (Before/After)CommitTableEvent has been removed.

### New Features

Expand All @@ -64,7 +65,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- Relaxed the requirements for S3 storage's ARN to allow Polaris to connect to more non-AWS S3 storage appliances.
- Added checksum to helm deployment so that it will restart when the configmap has changed.
- Generic Table is no longer in beta and is generally-available.
- Added Windows support for Python client
- Added Windows support for Python client.
- (Before/After)UpdateTableEvent is emitted for all table updates within a transaction.

### Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,10 +1408,6 @@ public void doRefresh() {
}

public void doCommit(TableMetadata base, TableMetadata metadata) {
polarisEventListener.onBeforeCommitTable(
new IcebergRestCatalogEvents.BeforeCommitTableEvent(
eventMetadataFactory.create(), catalogName, tableIdentifier, base, metadata));

LOGGER.debug(
"doCommit for table {} with metadataBefore {}, metadataAfter {}",
tableIdentifier,
Expand Down Expand Up @@ -1560,10 +1556,6 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
} else {
updateTableLike(tableIdentifier, entity);
}

polarisEventListener.onAfterCommitTable(
new IcebergRestCatalogEvents.AfterCommitTableEvent(
eventMetadataFactory.create(), catalogName, tableIdentifier, base, metadata));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.polaris.service.catalog.iceberg;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
Expand Down Expand Up @@ -106,6 +108,22 @@ public class IcebergRestCatalogEventServiceDelegator
@Inject PolarisEventMetadataFactory eventMetadataFactory;
@Inject CatalogPrefixParser prefixParser;

// Constructor for testing - allows manual dependency injection
@VisibleForTesting
public IcebergRestCatalogEventServiceDelegator(
IcebergCatalogAdapter delegate,
PolarisEventListener polarisEventListener,
PolarisEventMetadataFactory eventMetadataFactory,
CatalogPrefixParser prefixParser) {
this.delegate = delegate;
this.polarisEventListener = polarisEventListener;
this.eventMetadataFactory = eventMetadataFactory;
this.prefixParser = prefixParser;
}

// Default constructor for CDI
public IcebergRestCatalogEventServiceDelegator() {}

@Override
public Response createNamespace(
String prefix,
Expand Down Expand Up @@ -597,11 +615,30 @@ public Response commitTransaction(
polarisEventListener.onBeforeCommitTransaction(
new IcebergRestCatalogEvents.BeforeCommitTransactionEvent(
eventMetadataFactory.create(), catalogName, commitTransactionRequest));
for (UpdateTableRequest req : commitTransactionRequest.tableChanges()) {
polarisEventListener.onBeforeUpdateTable(
new BeforeUpdateTableEvent(
eventMetadataFactory.create(),
catalogName,
req.identifier().namespace(),
req.identifier().name(),
req));
}
Response resp =
delegate.commitTransaction(prefix, commitTransactionRequest, realmContext, securityContext);
polarisEventListener.onAfterCommitTransaction(
new IcebergRestCatalogEvents.AfterCommitTransactionEvent(
eventMetadataFactory.create(), catalogName, commitTransactionRequest));
for (UpdateTableRequest req : commitTransactionRequest.tableChanges()) {
polarisEventListener.onAfterUpdateTable(
new AfterUpdateTableEvent(
eventMetadataFactory.create(),
catalogName,
req.identifier().namespace(),
req.identifier().name(),
req,
null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar events emitted from updateTable have a value for the last parameter (LoadTableResponse), but here it is null.

It that parameter useful to the consumer when it is not available in all situations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented about this earlier: #3195 (comment)

We should get back to the state where this is not null soon. But for that, we will have to solve the larger issue in #3209. I think it is fine to keep this code for now and get back to working on #3209 (hopefully in the near future) to fix this null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing the earlier comment 🤦

If our intention is to provide the same set of parameters (non-null) to events of the same type in all situations, then given that this PR deviates from that goal, I believe we ought to mark this on the javadoc of AfterUpdateTableEvent at least.

A CHANGELOG entry is not required, IMHO, since it's developer-level change at this stage.

That said, the plan SGTM 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference would be to remove LoadTableResponse from AfterUpdateTableEvent in this PR (in all cases) and re-add when we can provide it in all contexts. I think it would provide a more coherent migration path from the consumer's POV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference would be to remove LoadTableResponse from AfterUpdateTableEvent in this PR (in all cases) and re-add when we can provide it in all contexts.

This is fair too. I was trying to reduce the amount of changes to the event structure itself and use the fact that this is null (only in the case of transactions, to be clear) to place urgency in trying to fix #3209. I'll make the change for the Javadoc - let me know if removing the LoadTableResponse is a blocking concern for you. Else, I'd prefer to keep this approach the way it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx - LGTM 👍

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right behavior for multi-table transactions. Emitting AfterUpdateTableEvent only after the entire transaction succeeds ensures the correct semantics and avoids exposing partial state. Thanks for making this change.

If there’s interest in finer-grained events for multi-table transactions (e.g., per-table events that fire even before the full transaction commits), we could explore that as a follow-up discussion.

Here is the related dev ML discussion: https://lists.apache.org/thread/5og0qjo8l9rf0kytqjg4gn7d9r81gf79.

Can we add tests for this new logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test!

return resp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.polaris.service.catalog.iceberg;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
Expand All @@ -41,6 +42,19 @@ public class IcebergRestConfigurationEventServiceDelegator
@Inject PolarisEventListener polarisEventListener;
@Inject PolarisEventMetadataFactory eventMetadataFactory;

@VisibleForTesting
public IcebergRestConfigurationEventServiceDelegator(
IcebergCatalogAdapter delegate,
PolarisEventListener polarisEventListener,
PolarisEventMetadataFactory eventMetadataFactory) {
this.delegate = delegate;
this.polarisEventListener = polarisEventListener;
this.eventMetadataFactory = eventMetadataFactory;
}

// Default constructor for CDI
public IcebergRestConfigurationEventServiceDelegator() {}

@Override
public Response getConfig(
String warehouse, RealmContext realmContext, SecurityContext securityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.polaris.service.events;

import java.util.Map;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
Expand All @@ -30,12 +29,12 @@
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.polaris.service.types.CommitTableRequest;
import org.apache.polaris.service.types.CommitViewRequest;
import org.apache.polaris.service.types.NotificationRequest;

Expand Down Expand Up @@ -330,20 +329,21 @@ public record BeforeUpdateTableEvent(
String catalogName,
Namespace namespace,
String sourceTable,
CommitTableRequest commitTableRequest)
UpdateTableRequest commitTableRequest)
implements PolarisEvent {
@Override
public PolarisEventType type() {
return PolarisEventType.BEFORE_UPDATE_TABLE;
}
}

/** LoadTableResponse is optional; it will not be populated in case of a transaction. */
public record AfterUpdateTableEvent(
PolarisEventMetadata metadata,
String catalogName,
Namespace namespace,
String sourceTable,
CommitTableRequest commitTableRequest,
UpdateTableRequest commitTableRequest,
LoadTableResponse loadTableResponse)
implements PolarisEvent {
@Override
Expand Down Expand Up @@ -584,32 +584,6 @@ public PolarisEventType type() {
}

// Legacy events
public record BeforeCommitTableEvent(
PolarisEventMetadata metadata,
String catalogName,
TableIdentifier identifier,
TableMetadata metadataBefore,
TableMetadata metadataAfter)
implements PolarisEvent {
@Override
public PolarisEventType type() {
return PolarisEventType.BEFORE_COMMIT_TABLE;
}
}

public record AfterCommitTableEvent(
PolarisEventMetadata metadata,
String catalogName,
TableIdentifier identifier,
TableMetadata metadataBefore,
TableMetadata metadataAfter)
implements PolarisEvent {
@Override
public PolarisEventType type() {
return PolarisEventType.AFTER_COMMIT_TABLE;
}
}

public record BeforeCommitViewEvent(
PolarisEventMetadata metadata,
String catalogName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ public enum PolarisEventType {
AFTER_RENAME_TABLE,
BEFORE_UPDATE_TABLE,
AFTER_UPDATE_TABLE,
BEFORE_COMMIT_TABLE,
AFTER_COMMIT_TABLE,
BEFORE_COMMIT_TABLE, // REMOVED FROM SOURCE CODE
AFTER_COMMIT_TABLE, // REMOVED FROM SOURCE CODE
Comment on lines +128 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove them as there is no usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that will change the ordinal numbers of the enum and so I'm avoiding doing that. Given Events are still in "preview" we have the ability to do so without any prior notice - but just being careful. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid concern. I'm OK to remove it as this is preview feature, but it requires notice. Another option is to adding code field to explicitly set a number for the type. This is more robust so that it prevents any developer adding an event in the middle and accidentally break downstreams. It isn't a blocker for this PR though. Can you file an issue for this?

Something like:

public enum EventType {
    AFTER_UPDATE(101),
    AFTER_DELETE(102);

    private final int code;

    EventType(int code) {
        this.code = code;
    }

    public int code() {
        return code;
    }
}

Adding a constructor will prevent accidental ordinal changes and make future evolution safer. Given the downstream relies on the event type, I think it's reasonable to add a explicit code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds good! I will open the issue!

BEFORE_REFRESH_TABLE,
AFTER_REFRESH_TABLE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ default void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent

default void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {}

default void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {}

default void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {}

default void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {}

default void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2369,19 +2369,6 @@ public void testEventsAreEmitted() {
var afterRefreshEvent =
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshTableEvent.class);
Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);

var beforeTableEvent =
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class);
Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE);
Assertions.assertThat(beforeTableEvent.metadataBefore().properties().get(key))
.isEqualTo(valOld);
Assertions.assertThat(beforeTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);

var afterTableEvent =
testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class);
Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE);
Assertions.assertThat(afterTableEvent.metadataBefore().properties().get(key)).isEqualTo(valOld);
Assertions.assertThat(afterTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew);
}

private static PageToken nextRequest(Page<?> previousPage) {
Expand Down
Loading