Skip to content

Commit

Permalink
[FLINK-25520][Table SQL/API] Implement "ALTER TABLE ... COMPACT" SQL
Browse files Browse the repository at this point in the history
 * Convert "ALTER TABLE ... COMPACT" to RelNode
 * Add interface method `ManagedTableFactory#onCompact` and test implementation to inject file metadata as hints
 * Add support for "EXPLAIN PLAN FOR ALTER TABLE ... COMPACT"
 * And plan test and ITCase
  • Loading branch information
LadyForest committed Jan 18, 2022
1 parent b7fd63b commit 6d75de0
Show file tree
Hide file tree
Showing 22 changed files with 2,163 additions and 157 deletions.
Expand Up @@ -27,7 +27,7 @@
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {

/**
* Write a element to the part file.
* Write an element to the part file.
*
* @param element the element to be written.
* @param currentTime the writing time.
Expand Down
Expand Up @@ -113,7 +113,6 @@
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
Expand Down Expand Up @@ -1006,8 +1005,6 @@ public TableResultInternal executeInternal(Operation operation) {
for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
catalog.dropPartition(tablePath, spec, ifExists);
}
} else if (alterTableOperation instanceof AlterTableCompactOperation) {
// TODO: FLINK-25176 work with managed table
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
Expand Down
Expand Up @@ -739,6 +739,26 @@ public void createTemporaryTable(
});
}

/**
* Resolve dynamic options for compact operation on a Flink's managed table.
*
* @param origin The resolved managed table with enriched options.
* @param tableIdentifier The fully qualified path of the managed table.
* @param partitionSpec User-specified unresolved partition spec.
* @return dynamic options which describe the metadata of compaction
*/
public Map<String, String> resolveCompactManagedTableOptions(
ResolvedCatalogTable origin,
ObjectIdentifier tableIdentifier,
CatalogPartitionSpec partitionSpec) {
return managedTableListener.notifyTableCompaction(
catalogs.getOrDefault(tableIdentifier.getCatalogName(), null),
tableIdentifier,
origin,
partitionSpec,
false);
}

/**
* Drop a temporary table in a given fully qualified path.
*
Expand Down
Expand Up @@ -80,6 +80,23 @@ public void notifyTableDrop(
}
}

/** Notify compaction for managed table. */
public Map<String, String> notifyTableCompaction(
@Nullable Catalog catalog,
ObjectIdentifier identifier,
ResolvedCatalogBaseTable<?> table,
CatalogPartitionSpec partitionSpec,
boolean isTemporary) {
if (isManagedTable(catalog, table)) {
return discoverManagedTableFactory(classLoader)
.onCompactTable(
createTableFactoryContext(
identifier, (ResolvedCatalogTable) table, isTemporary),
partitionSpec);
}
throw new UnsupportedOperationException("Only managed table supports compaction");
}

/** Check a resolved catalog table is Flink's managed table or not. */
public static boolean isManagedTable(
@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
Expand Down
Expand Up @@ -18,41 +18,32 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.OperationUtils;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.operations.CatalogQueryOperation;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */
public class AlterTableCompactOperation extends AlterTableOperation {
public class AlterTableCompactOperation extends CatalogQueryOperation {

private final CatalogPartitionSpec partitionSpec;
private final ResolvedCatalogTable resolvedManagedTable;
private final Map<String, String> compactOptions;

public AlterTableCompactOperation(
ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) {
super(tableIdentifier);
this.partitionSpec = partitionSpec;
ObjectIdentifier tableIdentifier,
ResolvedCatalogTable resolvedManagedTable,
Map<String, String> compactOptions) {
super(tableIdentifier, resolvedManagedTable.getResolvedSchema());
this.resolvedManagedTable = resolvedManagedTable;
this.compactOptions = compactOptions;
}

public Map<String, String> getPartitionSpec() {
return partitionSpec == null
? Collections.emptyMap()
: new LinkedHashMap<>(partitionSpec.getPartitionSpec());
public ResolvedCatalogTable getResolvedManagedTable() {
return resolvedManagedTable;
}

@Override
public String asSummaryString() {
String spec =
partitionSpec == null
? ""
: String.format(
"PARTITION (%s) ",
OperationUtils.formatPartitionSpec(partitionSpec));
return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec);
public Map<String, String> getCompactOptions() {
return compactOptions;
}
}
5 changes: 5 additions & 0 deletions flink-table/flink-table-common/pom.xml
Expand Up @@ -69,6 +69,11 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.CatalogPartitionSpec;

import java.util.Map;

Expand Down Expand Up @@ -50,6 +51,13 @@ default String factoryIdentifier() {
/** Notifies the listener that a table drop occurred. */
void onDropTable(Context context, boolean ignoreIfNotExists);

/**
* Notifies the listener that a table compaction occurred.
*
* @return dynamic options of the file entries under compaction for this table.
*/
Map<String, String> onCompactTable(Context context, CatalogPartitionSpec partitionSpec);

/** Discovers the unique implementation of {@link ManagedTableFactory} without identifier. */
static ManagedTableFactory discoverManagedTableFactory(ClassLoader classLoader) {
return FactoryUtil.discoverManagedTableFactory(classLoader, ManagedTableFactory.class);
Expand Down

0 comments on commit 6d75de0

Please sign in to comment.