Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2540] Report expiring snapshots event for Iceberg and mixed format #2541

Closed
wants to merge 14 commits into from
Closed
5 changes: 5 additions & 0 deletions ams/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
<artifactId>fastjson</artifactId>
</dependency>

<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@

/** Event types define */
public enum EventType {
ICEBERG_REPORT
ICEBERG_REPORT,
EXPIRE_REPORT

Check warning on line 24 in ams/api/src/main/java/com/netease/arctic/ams/api/events/EventType.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/EventType.java#L23-L24

Added lines #L23 - L24 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events;

import org.immutables.value.Value;

/**
* Amoro automatically maintains datalake metadata files(e.g. expire snapshots, clean orphan
* files...), every maintenance process will generate an event which could be reported to {@link
* com.netease.arctic.ams.api.events.EventListener EventListener}
*/
@Value.Immutable
public interface ExpireEvent extends TableEvent {
/**
* The expiring process id
*
* @return id
*/
long processId();

/**
* Operation of expiring(e.g. expire snapshots)
*
* @return expiring operation
*/
ExpireOperation operation();

/**
* Details of expiring event
*
* @return event detail
*/
ExpireResult expireResult();

/**
* Get expiring event result as target specific Class
*
* @param clazz Subclass of ExpireResult
* @return Specific Class
* @param <R> Specific Class
*/
default <R extends ExpireResult> R getExpireResultAs(Class<R> clazz) {
return clazz.cast(expireResult());

Check warning on line 59 in ams/api/src/main/java/com/netease/arctic/ams/api/events/ExpireEvent.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/ExpireEvent.java#L59

Added line #L59 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events;

/** Define various expiration operations */
public enum ExpireOperation {
EXPIRE_SNAPSHOTS,
REMOVE_ORPHAN_FILES,
REMOVE_DANGLING_FILES,
EXPIRE_DATA

Check warning on line 26 in ams/api/src/main/java/com/netease/arctic/ams/api/events/ExpireOperation.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/ExpireOperation.java#L22-L26

Added lines #L22 - L26 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events;

import java.time.Duration;

/** Expire event details */
public interface ExpireResult {
/**
* total duration of the event
*
* @return duration
*/
Duration totalDuration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.ams.api.events;

import com.netease.arctic.ams.api.TableFormat;
import org.apache.iceberg.metrics.MetricsReport;

/** An event triggered when iceberg report metrics */
Expand All @@ -27,16 +28,13 @@
private final String catalog;
private final String database;
private final String table;
private final boolean external;

public IcebergReportEvent(
String catalog, String database, String table, boolean external, MetricsReport report) {
public IcebergReportEvent(String catalog, String database, String table, MetricsReport report) {

Check warning on line 32 in ams/api/src/main/java/com/netease/arctic/ams/api/events/IcebergReportEvent.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/IcebergReportEvent.java#L32

Added line #L32 was not covered by tests
this.report = report;
this.timestamp = System.currentTimeMillis();
this.catalog = catalog;
this.database = database;
this.table = table;
this.external = external;
}

@Override
Expand Down Expand Up @@ -65,8 +63,8 @@
}

@Override
public boolean isExternal() {
return external;
public TableFormat format() {
return TableFormat.ICEBERG;

Check warning on line 67 in ams/api/src/main/java/com/netease/arctic/ams/api/events/IcebergReportEvent.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/IcebergReportEvent.java#L67

Added line #L67 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.netease.arctic.ams.api.events;

import com.netease.arctic.ams.api.TableFormat;

/** An event associated with a table */
public interface TableEvent extends Event {

Expand All @@ -43,9 +45,9 @@ public interface TableEvent extends Event {
String table();

/**
* External or Internal Catalog that this event source related to.
* Effective table format
*
* @return True if event source from an external catalog.
* @return table format
*/
boolean isExternal();
TableFormat format();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events.iceberg;

import com.netease.arctic.ams.api.events.ExpireResult;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.immutables.value.Value;

import java.util.Set;

/** Event details for expiring snapshots in native Iceberg format */
@Value.Immutable
public abstract class ExpireSnapshotsResult implements ExpireResult {

Check warning on line 29 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java#L29

Added line #L29 was not covered by tests
@Value.Default
public Set<String> deletedDataFiles() {
return Sets.newHashSet();

Check warning on line 32 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java#L32

Added line #L32 was not covered by tests
}

@Value.Default
public Set<String> deletedPositionDeleteFiles() {
return Sets.newHashSet();

Check warning on line 37 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java#L37

Added line #L37 was not covered by tests
}

@Value.Default
public Set<String> deletedEqualityDeleteFiles() {
return Sets.newHashSet();

Check warning on line 42 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java#L42

Added line #L42 was not covered by tests
}

public abstract Set<String> deletedMetadataFiles();

@Value.Default
public Set<String> deletedStatisticsFiles() {
return Sets.newHashSet();

Check warning on line 49 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/ExpireSnapshotsResult.java#L49

Added line #L49 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events.iceberg;

import com.netease.arctic.ams.api.events.ExpireResult;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.immutables.value.Value;

import java.util.Set;

/** Event details for cleaning orphan files in native Iceberg format */
@Value.Immutable
public abstract class RemoveOrphanFilesResult implements ExpireResult {

Check warning on line 29 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java#L29

Added line #L29 was not covered by tests
@Value.Default
public Set<String> removedFiles() {
return Sets.newHashSet();

Check warning on line 32 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java#L32

Added line #L32 was not covered by tests
}

@Value.Default
public long releasedFileSize() {
return 0L;

Check warning on line 37 in ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/iceberg/RemoveOrphanFilesResult.java#L37

Added line #L37 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.api.events.mixed;

import com.netease.arctic.ams.api.events.ExpireResult;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.immutables.value.Value;

import java.util.Set;

/** Event details for expiring snapshots in mixed format */
@Value.Immutable
public abstract class ExpireMixedSnapshotsResult implements ExpireResult {

Check warning on line 29 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L29

Added line #L29 was not covered by tests
@Value.Default
public Set<String> deletedBaseFiles() {
return Sets.newHashSet();

Check warning on line 32 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L32

Added line #L32 was not covered by tests
}

@Value.Default
public Set<String> deletedInsertFiles() {
return Sets.newHashSet();

Check warning on line 37 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L37

Added line #L37 was not covered by tests
}

@Value.Default
public Set<String> deletedPositionDeleteFiles() {
return Sets.newHashSet();

Check warning on line 42 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L42

Added line #L42 was not covered by tests
}

@Value.Default
public Set<String> deletedEqualityDeleteFiles() {
return Sets.newHashSet();

Check warning on line 47 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L47

Added line #L47 was not covered by tests
}

@Value.Default
public Set<String> deletedIcebergEqualityDeleteFiles() {
return Sets.newHashSet();

Check warning on line 52 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L52

Added line #L52 was not covered by tests
}

public abstract Set<String> deletedMetadataFiles();

@Value.Default
public Set<String> deletedStatisticsFiles() {
return Sets.newHashSet();

Check warning on line 59 in ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java

View check run for this annotation

Codecov / codecov/patch

ams/api/src/main/java/com/netease/arctic/ams/api/events/mixed/ExpireMixedSnapshotsResult.java#L59

Added line #L59 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ public void reportMetrics(Context ctx) {
identifier.getCatalog(),
identifier.getDatabase(),
identifier.getTableName(),
false,
metricsRequest.report());
EventsManager.getInstance().emit(event);
return null;
Expand Down
Loading
Loading