diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java index 6832f93c6ec854..c316254976db16 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java @@ -27,7 +27,7 @@ public interface InProgressFileWriter extends PartFileInfo { /** - * Write a element to the part file. + * Write an element to the part file. * * @param element the element to be written. * @param currentTime the writing time. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c445fc30283985..8d6b4c51383e9e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -113,7 +113,6 @@ import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation; -import org.apache.flink.table.operations.ddl.AlterTableCompactOperation; import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; @@ -1006,8 +1005,6 @@ public TableResultInternal executeInternal(Operation operation) { for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) { catalog.dropPartition(tablePath, spec, ifExists); } - } else if (alterTableOperation instanceof AlterTableCompactOperation) { - // TODO: FLINK-25176 work with managed table } return TableResultImpl.TABLE_RESULT_OK; } catch (TableAlreadyExistException | TableNotExistException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 5751170a38fd1e..8d9fa33ab00632 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -739,6 +739,26 @@ public void createTemporaryTable( }); } + /** + * Resolve dynamic options for compact operation on a Flink's managed table. + * + * @param origin The resolved managed table with enriched options. + * @param tableIdentifier The fully qualified path of the managed table. + * @param partitionSpec User-specified unresolved partition spec. + * @return dynamic options which describe the metadata of compaction + */ + public Map resolveCompactManagedTableOptions( + ResolvedCatalogTable origin, + ObjectIdentifier tableIdentifier, + CatalogPartitionSpec partitionSpec) { + return managedTableListener.notifyTableCompaction( + catalogs.getOrDefault(tableIdentifier.getCatalogName(), null), + tableIdentifier, + origin, + partitionSpec, + false); + } + /** * Drop a temporary table in a given fully qualified path. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java index 93d13d5ca9107c..5ea08d2efea314 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java @@ -80,6 +80,23 @@ public void notifyTableDrop( } } + /** Notify compaction for managed table. */ + public Map notifyTableCompaction( + @Nullable Catalog catalog, + ObjectIdentifier identifier, + ResolvedCatalogBaseTable table, + CatalogPartitionSpec partitionSpec, + boolean isTemporary) { + if (isManagedTable(catalog, table)) { + return discoverManagedTableFactory(classLoader) + .onCompactTable( + createTableFactoryContext( + identifier, (ResolvedCatalogTable) table, isTemporary), + partitionSpec); + } + throw new UnsupportedOperationException("Only managed table supports compaction"); + } + /** Check a resolved catalog table is Flink's managed table or not. */ public static boolean isManagedTable( @Nullable Catalog catalog, ResolvedCatalogBaseTable table) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java index 14af95064e540b..74d0929c858637 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java @@ -18,41 +18,32 @@ package org.apache.flink.table.operations.ddl; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.operations.CatalogQueryOperation; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; /** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */ -public class AlterTableCompactOperation extends AlterTableOperation { +public class AlterTableCompactOperation extends CatalogQueryOperation { - private final CatalogPartitionSpec partitionSpec; + private final ResolvedCatalogTable resolvedManagedTable; + private final Map compactOptions; public AlterTableCompactOperation( - ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) { - super(tableIdentifier); - this.partitionSpec = partitionSpec; + ObjectIdentifier tableIdentifier, + ResolvedCatalogTable resolvedManagedTable, + Map compactOptions) { + super(tableIdentifier, resolvedManagedTable.getResolvedSchema()); + this.resolvedManagedTable = resolvedManagedTable; + this.compactOptions = compactOptions; } - public Map getPartitionSpec() { - return partitionSpec == null - ? Collections.emptyMap() - : new LinkedHashMap<>(partitionSpec.getPartitionSpec()); + public ResolvedCatalogTable getResolvedManagedTable() { + return resolvedManagedTable; } - @Override - public String asSummaryString() { - String spec = - partitionSpec == null - ? "" - : String.format( - "PARTITION (%s) ", - OperationUtils.formatPartitionSpec(partitionSpec)); - return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec); + public Map getCompactOptions() { + return compactOptions; } } diff --git a/flink-table/flink-table-common/pom.xml b/flink-table/flink-table-common/pom.xml index 8e8ff8fe284fee..f99fb666b5a63b 100644 --- a/flink-table/flink-table-common/pom.xml +++ b/flink-table/flink-table-common/pom.xml @@ -69,6 +69,11 @@ under the License. org.apache.flink flink-test-utils-junit + + org.apache.flink + flink-shaded-jackson + test + diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java index 6dcba28ce20688..17f00f4000b64e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import java.util.Map; @@ -50,6 +51,13 @@ default String factoryIdentifier() { /** Notifies the listener that a table drop occurred. */ void onDropTable(Context context, boolean ignoreIfNotExists); + /** + * Notifies the listener that a table compaction occurred. + * + * @return dynamic options of the file entries under compaction for this table. + */ + Map onCompactTable(Context context, CatalogPartitionSpec partitionSpec); + /** Discovers the unique implementation of {@link ManagedTableFactory} without identifier. */ static ManagedTableFactory discoverManagedTableFactory(ClassLoader classLoader) { return FactoryUtil.discoverManagedTableFactory(classLoader, ManagedTableFactory.class); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSink.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSink.java new file mode 100644 index 00000000000000..9f4a162c1442e4 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSink.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.TestManagedTableFactory; +import org.apache.flink.table.utils.PartitionPathUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +/** Managed {@link DynamicTableSink} for testing. */ +public class TestManagedTableSink + implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning { + + private final DynamicTableFactory.Context context; + private final Path basePath; + + private LinkedHashMap staticPartitionSpecs = new LinkedHashMap<>(); + private boolean overwrite = false; + + public TestManagedTableSink(DynamicTableFactory.Context context, Path basePath) { + this.context = context; + this.basePath = basePath; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.insertOnly(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return SinkProvider.of(new TestManagedSink(this.context.getObjectIdentifier(), basePath)); + } + + @Override + public DynamicTableSink copy() { + TestManagedTableSink copied = new TestManagedTableSink(context, basePath); + copied.overwrite = this.overwrite; + copied.staticPartitionSpecs = this.staticPartitionSpecs; + return copied; + } + + @Override + public String asSummaryString() { + return "TestManagedSink"; + } + + @Override + public void applyOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + @Override + public void applyStaticPartition(Map partition) { + List partitionKeys = context.getCatalogTable().getPartitionKeys(); + for (String partitionKey : partitionKeys) { + if (partition.containsKey(partitionKey)) { + staticPartitionSpecs.put(partitionKey, partition.get(partitionKey)); + } + } + } + + /** Managed {@link Sink} for testing compaction. */ + public static class TestManagedSink + implements Sink { + + private final ObjectIdentifier tableIdentifier; + private final Path basePath; + + public TestManagedSink(ObjectIdentifier tableIdentifier, Path basePath) { + this.tableIdentifier = tableIdentifier; + this.basePath = basePath; + } + + @Override + public SinkWriter createWriter( + InitContext context, List states) throws IOException { + return new TestManagedSinkWriter(); + } + + @Override + public Optional> createCommitter() { + return Optional.of(new TestManagedSinkCommitter(tableIdentifier, basePath)); + } + + @Override + public Optional> + getCommittableSerializer() { + return Optional.of(new TestManagedSinkCommittableSerializer()); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.empty(); + } + + @Override + public Optional> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + public Optional> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Collection getCompatibleStateNames() { + return Collections.emptyList(); + } + } + + /** + * Committable which contains the generated compact files to be created and the old files to be + * deleted. + */ + public static class TestManagedCommittable implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map> toAdd; + private final Map> toDelete; + + public TestManagedCommittable( + Map> toAdd, + Map> toDelete) { + this.toAdd = toAdd; + this.toDelete = toDelete; + } + + public static TestManagedCommittable combine(List committables) { + Map> toAdd = new HashMap<>(); + Map> toDelete = new HashMap<>(); + for (TestManagedCommittable committable : committables) { + Map> partialAdd = committable.toAdd; + Map> partialDelete = committable.toDelete; + + for (Map.Entry> entry : partialAdd.entrySet()) { + CatalogPartitionSpec partitionSpec = entry.getKey(); + List elements = toAdd.getOrDefault(partitionSpec, new ArrayList<>()); + elements.addAll(entry.getValue()); + toAdd.put(partitionSpec, elements); + } + + for (Map.Entry> entry : partialDelete.entrySet()) { + CatalogPartitionSpec partitionSpec = entry.getKey(); + Set paths = toDelete.getOrDefault(partitionSpec, new HashSet<>()); + paths.addAll(entry.getValue()); + toDelete.put(partitionSpec, paths); + } + } + return new TestManagedCommittable(toAdd, toDelete); + } + } + + /** Managed {@link SinkWriter} for testing compaction. */ + public static class TestManagedSinkWriter + implements SinkWriter { + + private final Map processedPartitions = new HashMap<>(); + private final Map> stagingElements = new HashMap<>(); + private final Map> toDelete = new HashMap<>(); + + @Override + public void write(RowData element, Context context) + throws IOException, InterruptedException { + assert element.getArity() == 3; + String partition = element.getString(0).toString(); + Path filePath = new Path(element.getString(1).toString()); + RowData rowData = GenericRowData.of(element.getString(2)); + CatalogPartitionSpec currentPartitionSpec = + processedPartitions.getOrDefault( + partition, + new CatalogPartitionSpec( + PartitionPathUtils.extractPartitionSpecFromPath(filePath))); + processedPartitions.put(partition, currentPartitionSpec); + List elements = + stagingElements.getOrDefault(currentPartitionSpec, new ArrayList<>()); + elements.add(rowData); + stagingElements.put(currentPartitionSpec, elements); + Set old = toDelete.getOrDefault(currentPartitionSpec, new HashSet<>()); + old.add(filePath); + toDelete.put(currentPartitionSpec, old); + } + + @Override + public List prepareCommit(boolean flush) + throws IOException, InterruptedException { + return Collections.singletonList(new TestManagedCommittable(stagingElements, toDelete)); + } + + @Override + public void close() throws Exception {} + } + + /** Managed {@link Committer} for testing compaction. */ + public static class TestManagedSinkCommitter implements Committer { + + private final ObjectIdentifier tableIdentifier; + private final Path basePath; + private final transient RowDataEncoder encoder = new RowDataEncoder(); + + public TestManagedSinkCommitter(ObjectIdentifier tableIdentifier, Path basePath) { + this.tableIdentifier = tableIdentifier; + this.basePath = basePath; + } + + @Override + public List commit(List committables) + throws IOException, InterruptedException { + // combine files to add/delete + TestManagedCommittable combinedCommittable = + TestManagedCommittable.combine(committables); + + AtomicReference>> reference = + TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.get(tableIdentifier); + assert reference != null; + Map> managedTableFileEntries = reference.get(); + + // commit new files + commitAdd(combinedCommittable.toAdd, managedTableFileEntries); + + // cleanup old files + commitDelete(combinedCommittable.toDelete, managedTableFileEntries); + + reference.set(managedTableFileEntries); + + return Collections.emptyList(); + } + + @Override + public void close() throws Exception {} + + private void commitAdd( + Map> toAdd, + Map> managedTableFileEntries) + throws IOException { + + Map processedPartitions = new HashMap<>(); + for (Map.Entry> entry : toAdd.entrySet()) { + CatalogPartitionSpec partitionSpec = entry.getKey(); + String partition = + processedPartitions.computeIfAbsent( + partitionSpec, + (spec) -> + PartitionPathUtils.generatePartitionPath( + new LinkedHashMap<>(spec.getPartitionSpec()))); + List elements = entry.getValue(); + Path compactFilePath = + new Path( + basePath, + new Path( + String.format( + "%scompact-%s-file-0", + partition, UUID.randomUUID()))); + FSDataOutputStream outputStream = + compactFilePath + .getFileSystem() + .create(compactFilePath, FileSystem.WriteMode.NO_OVERWRITE); + for (RowData element : elements) { + encoder.encode(element, outputStream); + } + outputStream.flush(); + outputStream.close(); + + List fileEntries = managedTableFileEntries.get(partitionSpec); + fileEntries.add(compactFilePath); + managedTableFileEntries.put(partitionSpec, fileEntries); + } + } + + private void commitDelete( + Map> toDelete, + Map> managedTableFileEntries) + throws IOException { + for (Map.Entry> entry : toDelete.entrySet()) { + CatalogPartitionSpec partitionSpec = entry.getKey(); + Set pathsToDelete = entry.getValue(); + for (Path path : pathsToDelete) { + path.getFileSystem().delete(path, false); + } + List paths = managedTableFileEntries.get(partitionSpec); + paths.removeAll(pathsToDelete); + managedTableFileEntries.put(partitionSpec, paths); + } + } + } + + /** Serializer for {@link TestManagedCommittable} for testing compaction. */ + public static class TestManagedSinkCommittableSerializer + implements SimpleVersionedSerializer { + + private static final int VERSION = 1; + + private final DataOutputSerializer out = new DataOutputSerializer(64); + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(final TestManagedCommittable committable) throws IOException { + out.writeInt(committable.toAdd.size()); + for (Map.Entry> entry : + committable.toAdd.entrySet()) { + serializePartitionSpec(entry.getKey()); + serializeRowDataElements(entry.getValue()); + } + out.writeInt(committable.toDelete.size()); + for (Map.Entry> entry : + committable.toDelete.entrySet()) { + serializePartitionSpec(entry.getKey()); + serializePaths(entry.getValue()); + } + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + @Override + public TestManagedCommittable deserialize(final int version, final byte[] serialized) + throws IOException { + if (version == VERSION) { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + int newFileSize = in.readInt(); + Map> toCommit = new HashMap<>(newFileSize); + for (int i = 0; i < newFileSize; i++) { + CatalogPartitionSpec partitionSpec = deserializePartitionSpec(in); + List elements = deserializeRowDataElements(in); + toCommit.put(partitionSpec, elements); + } + + int cleanupFileSize = in.readInt(); + Map> toCleanup = new HashMap<>(cleanupFileSize); + for (int i = 0; i < cleanupFileSize; i++) { + CatalogPartitionSpec partitionSpec = deserializePartitionSpec(in); + Set paths = deserializePaths(in); + toCleanup.put(partitionSpec, paths); + } + return new TestManagedCommittable(toCommit, toCleanup); + } + throw new IOException(String.format("Unknown version %d", version)); + } + + private void serializePartitionSpec(CatalogPartitionSpec partitionSpec) throws IOException { + Map partitionKVs = partitionSpec.getPartitionSpec(); + out.writeInt(partitionKVs.size()); + for (Map.Entry partitionKV : partitionKVs.entrySet()) { + out.writeUTF(partitionKV.getKey()); + out.writeUTF(partitionKV.getValue()); + } + } + + private void serializeRowDataElements(List elements) throws IOException { + out.writeInt(elements.size()); + for (RowData element : elements) { + out.writeUTF(element.getString(0).toString()); + } + } + + private void serializePaths(Set paths) throws IOException { + out.writeInt(paths.size()); + for (Path path : paths) { + path.write(out); + } + } + + private CatalogPartitionSpec deserializePartitionSpec(DataInputDeserializer in) + throws IOException { + int size = in.readInt(); + LinkedHashMap partitionKVs = new LinkedHashMap<>(size); + for (int i = 0; i < size; i++) { + String partitionKey = in.readUTF(); + String partitionValue = in.readUTF(); + partitionKVs.put(partitionKey, partitionValue); + } + return new CatalogPartitionSpec(partitionKVs); + } + + private List deserializeRowDataElements(DataInputDeserializer in) + throws IOException { + int size = in.readInt(); + List elements = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + elements.add(GenericRowData.of(StringData.fromString(in.readUTF()))); + } + return elements; + } + + private Set deserializePaths(DataInputDeserializer in) throws IOException { + int size = in.readInt(); + Set paths = new HashSet<>(size); + for (int i = 0; i < size; i++) { + Path path = new Path(); + path.read(in); + paths.add(path); + } + return paths; + } + } + + /** An {@link Encoder} implementation to encode records. */ + private static class RowDataEncoder implements Encoder { + + private static final long serialVersionUID = 1L; + + private static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; + + @Override + public void encode(RowData rowData, OutputStream stream) throws IOException { + stream.write(rowData.getString(0).toBytes()); + stream.write(LINE_DELIMITER); + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSource.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSource.java new file mode 100644 index 00000000000000..07f1b6461d5d2f --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedTableSource.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.utils.PartitionPathUtils; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import javax.annotation.Nullable; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; + +/** Managed {@link DynamicTableSource} for testing. */ +public class TestManagedTableSource implements ScanTableSource { + + private static final String FIELD_NAME_COMPACT_PARTITIONS = "compact-partitions"; + private static final String FIELD_NAME_RESOLVED_PARTITION_SPEC = "resolved-partition-spec"; + private static final String FIELD_NAME_FILE_ENTRIES = "file-entries"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final DynamicTableFactory.Context context; + private final CompactPartitions partitions; + private final ChangelogMode changelogMode; + + public TestManagedTableSource( + DynamicTableFactory.Context context, + CompactPartitions partitions, + ChangelogMode changelogMode) { + this.context = context; + this.partitions = partitions; + this.changelogMode = changelogMode; + } + + @Override + public ChangelogMode getChangelogMode() { + return changelogMode; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + return SourceProvider.of(new TestManagedSource(partitions)); + } + + @Override + public DynamicTableSource copy() { + return new TestManagedTableSource(context, partitions, changelogMode); + } + + @Override + public String asSummaryString() { + return "TestManagedTableSource"; + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + // ~ Inner Classes ------------------------------------------------------------------ + + /** Managed {@link Source} for testing. */ + public static class TestManagedSource + implements Source { + private static final long serialVersionUID = 1L; + + private final CompactPartitions partitions; + + public TestManagedSource(CompactPartitions partitions) { + this.partitions = partitions; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader( + SourceReaderContext readerContext) { + return new TestManagedFileSourceReader(readerContext); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + List splits = new ArrayList<>(); + partitions + .getCompactPartitions() + .forEach( + partition -> + partition + .getFileEntries() + .forEach( + fileEntry -> + splits.add( + new TestManagedIterableSourceSplit( + PartitionPathUtils + .generatePartitionPath( + partition + .getResolvedPartitionSpec()), + new Path(fileEntry))))); + return new TestManagedFileSourceSplitEnumerator(enumContext, splits); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, + Void checkpoint) { + throw new UnsupportedOperationException(); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new TestManagedFileSourceSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + // we don't need checkpoint under batch mode + return null; + } + } + + /** Managed {@link SourceReader} for testing. */ + public static class TestManagedFileSourceReader + extends IteratorSourceReader< + RowData, Iterator, TestManagedIterableSourceSplit> { + + public TestManagedFileSourceReader(SourceReaderContext context) { + super(context); + } + } + + /** Managed {@link org.apache.flink.api.connector.source.SourceSplit} for testing. */ + public static class TestManagedIterableSourceSplit + implements IteratorSourceSplit>, Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + private final Path filePath; + + private Iterator iterator; + + public TestManagedIterableSourceSplit(String id, Path filePath) { + this.id = id; + this.filePath = filePath; + } + + @Override + public String splitId() { + return id; + } + + @Override + public Iterator getIterator() { + if (iterator == null) { + try { + BufferedReader reader = new BufferedReader(new FileReader(filePath.getPath())); + iterator = + Iterators.transform( + reader.lines().iterator(), + line -> + GenericRowData.of( + StringData.fromString(id), + StringData.fromString(filePath.getPath()), + StringData.fromString(line))); + } catch (IOException e) { + // ignored + } + } + return iterator; + } + + @Override + public IteratorSourceSplit> getUpdatedSplitForIterator( + Iterator iterator) { + TestManagedIterableSourceSplit recovered = + new TestManagedIterableSourceSplit(this.id, this.filePath); + recovered.iterator = this.iterator; + return recovered; + } + + @Override + public String toString() { + return "TestManagedIterableSourceSplit{" + + "splitId='" + + id + + '\'' + + ", filePath=" + + filePath + + '}'; + } + } + + /** The deserializer for {@link CompactPartition}. */ + public static class CompactPartitionDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private JsonNode root; + + public CompactPartitionDeserializer() { + super(CompactPartition.class); + } + + @Override + public CompactPartition deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + JsonNode rootNode = + root == null ? (JsonNode) jsonParser.readValueAsTree().get(0) : root; + + JsonNode partitionSpecNode = rootNode.get(FIELD_NAME_RESOLVED_PARTITION_SPEC); + LinkedHashMap resolvedPartitionSpec = new LinkedHashMap<>(); + List fileEntries = new ArrayList<>(); + Iterator> mapIterator = partitionSpecNode.fields(); + while (mapIterator.hasNext()) { + Map.Entry entry = mapIterator.next(); + resolvedPartitionSpec.put(entry.getKey(), entry.getValue().textValue()); + } + JsonNode fileEntriesNode = rootNode.get(FIELD_NAME_FILE_ENTRIES); + for (JsonNode fileEntry : fileEntriesNode) { + fileEntries.add(fileEntry.asText()); + } + return new CompactPartition(resolvedPartitionSpec, fileEntries); + } + } + + /** The serializer for {@link CompactPartitions}. */ + public static class CompactPartitionsSerializer extends StdSerializer { + + private static final long serialVersionUID = 1L; + + private final CompactPartitionSerializer serializer = new CompactPartitionSerializer(); + + public CompactPartitionsSerializer() { + super(CompactPartitions.class); + } + + @Override + public void serialize( + CompactPartitions compactPartitions, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeFieldName(FIELD_NAME_COMPACT_PARTITIONS); + jsonGenerator.writeStartArray(); + for (CompactPartition partition : compactPartitions.compactPartitions) { + serializer.serialize(partition, jsonGenerator, serializerProvider); + } + jsonGenerator.writeEndArray(); + jsonGenerator.writeEndObject(); + } + } + + /** The deserializer for {@link CompactPartitions}. */ + public static class CompactPartitionsDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 6089784742093294800L; + + private final CompactPartitionDeserializer deserializer = + new CompactPartitionDeserializer(); + + public CompactPartitionsDeserializer() { + super(CompactPartitions.class); + } + + @Override + public CompactPartitions deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + List partitions = new ArrayList<>(); + JsonNode rootNode = jsonParser.readValueAsTree(); + JsonNode partitionNodes = rootNode.get(FIELD_NAME_COMPACT_PARTITIONS); + + for (JsonNode partitionEntry : partitionNodes) { + deserializer.root = partitionEntry; + partitions.add(deserializer.deserialize(jsonParser, deserializationContext)); + } + + return new CompactPartitions(partitions); + } + } + + /** Managed {@link SplitEnumerator} for testing. */ + public static class TestManagedFileSourceSplitEnumerator + implements SplitEnumerator { + + private final SplitEnumeratorContext context; + private final Queue remainingSplits; + + public TestManagedFileSourceSplitEnumerator( + SplitEnumeratorContext context, + List splits) { + this.context = context; + this.remainingSplits = new ArrayDeque<>(splits); + } + + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + TestManagedIterableSourceSplit split = remainingSplits.poll(); + if (split == null) { + context.signalNoMoreSplits(subtaskId); + } else { + context.assignSplit(split, subtaskId); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + splits.forEach(this.remainingSplits::offer); + } + + @Override + public void addReader(int subtaskId) {} + + @Override + public Void snapshotState(long checkpointId) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + } + + /** Managed {@link SimpleVersionedSerializer} for testing. */ + public static class TestManagedFileSourceSplitSerializer + implements SimpleVersionedSerializer { + + private static final int VERSION = 1; + + private final DataOutputSerializer out = new DataOutputSerializer(64); + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(TestManagedIterableSourceSplit split) throws IOException { + out.writeUTF(split.id); + split.filePath.write(out); + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + @Override + public TestManagedIterableSourceSplit deserialize(int version, byte[] serialized) + throws IOException { + if (version == VERSION) { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + final String id = in.readUTF(); + final Path path = new Path(); + path.read(in); + return new TestManagedIterableSourceSplit(id, path); + } + throw new IOException(String.format("Unknown version %d", version)); + } + } + + /** + * Test POJO to represent the compaction info for the partition lists to be injected when {@link + * org.apache.flink.table.factories.TestManagedTableFactory#onCompactTable(DynamicTableFactory.Context, + * CatalogPartitionSpec)} is called. + */ + @JsonSerialize(using = CompactPartitionsSerializer.class) + @JsonDeserialize(using = CompactPartitionsDeserializer.class) + public static class CompactPartitions implements Serializable { + + private static final long serialVersionUID = 1L; + + private final List compactPartitions; + + public CompactPartitions(List compactPartitions) { + this.compactPartitions = compactPartitions; + } + + public List getCompactPartitions() { + return compactPartitions; + } + } + + /** + * Test POJO to represent the compaction info for a single partition to be injected when {@link + * org.apache.flink.table.factories.TestManagedTableFactory#onCompactTable(DynamicTableFactory.Context, + * CatalogPartitionSpec)} is called. + */ + @JsonSerialize(using = CompactPartitionSerializer.class) + @JsonDeserialize(using = CompactPartitionDeserializer.class) + public static class CompactPartition implements Serializable { + + private static final long serialVersionUID = 1L; + + private final LinkedHashMap resolvedPartitionSpec; + + private final List fileEntries; + + public CompactPartition( + LinkedHashMap partitionSpec, List fileEntries) { + this.resolvedPartitionSpec = partitionSpec; + this.fileEntries = fileEntries; + } + + public LinkedHashMap getResolvedPartitionSpec() { + return resolvedPartitionSpec; + } + + public List getFileEntries() { + return fileEntries; + } + + @Override + public String toString() { + return "CompactPartition{" + + "resolvedPartitionSpec=" + + resolvedPartitionSpec + + ", fileEntries=" + + fileEntries + + '}'; + } + } + + /** The serializer for {@link CompactPartition}. */ + public static class CompactPartitionSerializer extends StdSerializer { + + private static final long serialVersionUID = 1L; + + public CompactPartitionSerializer() { + super(CompactPartition.class); + } + + @Override + public void serialize( + CompactPartition compactPartition, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectFieldStart(FIELD_NAME_RESOLVED_PARTITION_SPEC); + for (Map.Entry entry : + compactPartition.resolvedPartitionSpec.entrySet()) { + jsonGenerator.writeStringField(entry.getKey(), entry.getValue()); + } + jsonGenerator.writeEndObject(); + jsonGenerator.writeFieldName(FIELD_NAME_FILE_ENTRIES); + jsonGenerator.writeStartArray(); + for (String fileEntry : compactPartition.fileEntries) { + jsonGenerator.writeString(fileEntry); + } + jsonGenerator.writeEndArray(); + jsonGenerator.writeEndObject(); + } + } + + public static Optional deserializeCompactPartitions(String json) { + try { + return Optional.of(MAPPER.readValue(json, CompactPartitions.class)); + } catch (JsonProcessingException ignored) { + } + return Optional.empty(); + } + + public static Optional serializeCompactPartitions(CompactPartitions partitions) { + try { + return Optional.of(MAPPER.writeValueAsString(partitions)); + } catch (JsonProcessingException e) { + + } + return Optional.empty(); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index 5e2d2597834217..9d524f52c43a92 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.TestManagedTableSource; import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock; import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock; import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock; @@ -63,8 +64,7 @@ public void testManagedConnector() { final Map options = createAllOptions(); options.remove("connector"); final DynamicTableSource actualSource = createTableSource(SCHEMA, options); - assertThat(actualSource) - .isExactlyInstanceOf(TestManagedTableFactory.TestManagedTableSource.class); + assertThat(actualSource).isExactlyInstanceOf(TestManagedTableSource.class); } @Test diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java index 664b11ad5f8764..18e4e3d8c5a8e5 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java @@ -18,22 +18,40 @@ package org.apache.flink.table.factories; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.TestManagedTableSink; +import org.apache.flink.table.connector.source.TestManagedTableSource; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.RowKind; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.table.connector.source.TestManagedTableSource.deserializeCompactPartitions; +import static org.apache.flink.table.connector.source.TestManagedTableSource.serializeCompactPartitions; /** A test {@link ManagedTableFactory}. */ public class TestManagedTableFactory @@ -46,11 +64,21 @@ public class TestManagedTableFactory public static final Map>> MANAGED_TABLES = new ConcurrentHashMap<>(); + public static final Map< + ObjectIdentifier, AtomicReference>>> + MANAGED_TABLE_FILE_ENTRIES = new ConcurrentHashMap<>(); + private static final ConfigOption CHANGELOG_MODE = ConfigOptions.key("changelog-mode") .stringType() .defaultValue("I"); // all available "I,UA,UB,D" + private static final ConfigOption COMPACT_FILE_BASE_PATH = + ConfigOptions.key("compact.file-base-path").stringType().noDefaultValue(); + + private static final ConfigOption> COMPACT_FILE_ENTRIES = + ConfigOptions.key("compact.file-entries").stringType().asList().defaultValues(); + @Override public Set> requiredOptions() { HashSet> configOptions = new HashSet<>(); @@ -60,7 +88,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return new HashSet<>(); + return Collections.emptySet(); } @Override @@ -100,59 +128,53 @@ public void onDropTable(Context context, boolean ignoreIfNotExists) { } } + @Override + public Map onCompactTable( + Context context, CatalogPartitionSpec catalogPartitionSpec) { + ObjectIdentifier tableIdentifier = context.getObjectIdentifier(); + ResolvedCatalogTable table = context.getCatalogTable(); + Map newOptions = new HashMap<>(table.getOptions()); + + resolveCompactFileBasePath(tableIdentifier) + .ifPresent(s -> newOptions.put(COMPACT_FILE_BASE_PATH.key(), s)); + + validateAndResolveCompactFileEntries( + tableIdentifier, table.getPartitionKeys(), catalogPartitionSpec) + .ifPresent(s -> newOptions.put(COMPACT_FILE_ENTRIES.key(), s)); + return newOptions; + } + @Override public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE)); - return new TestManagedTableSource(changelogMode); + TestManagedTableSource.CompactPartitions compactPartitions = + deserializeCompactPartitions( + context.getCatalogTable() + .getOptions() + .getOrDefault(COMPACT_FILE_ENTRIES.key(), "")) + .orElse( + new TestManagedTableSource.CompactPartitions( + Collections.emptyList())); + return new TestManagedTableSource(context, compactPartitions, changelogMode); } @Override public DynamicTableSink createDynamicTableSink(Context context) { - return new TestManagedTableSink(); - } - - /** Managed {@link DynamicTableSource} for testing. */ - public static class TestManagedTableSource implements ScanTableSource { - - private final ChangelogMode changelogMode; - - public TestManagedTableSource(ChangelogMode changelogMode) { - this.changelogMode = changelogMode; - } - - @Override - public ChangelogMode getChangelogMode() { - return changelogMode; - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - return null; - } - - @Override - public DynamicTableSource copy() { - return new TestManagedTableSource(changelogMode); - } - - @Override - public String asSummaryString() { - return "TestManagedSource"; - } - - @Override - public boolean equals(Object o) { - return super.equals(o); - } - - @Override - public int hashCode() { - return super.hashCode(); + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + String basePath = helper.getOptions().get(COMPACT_FILE_BASE_PATH); + if (basePath == null) { + throw new ValidationException( + String.format( + "Cannot find base path for managed table %s", + context.getObjectIdentifier().asSerializableString())); } + return new TestManagedTableSink(context, new Path(basePath)); } - private ChangelogMode parseChangelogMode(String string) { + // ~ Tools ------------------------------------------------------------------ + + private static ChangelogMode parseChangelogMode(String string) { ChangelogMode.Builder builder = ChangelogMode.newBuilder(); for (String split : string.split(",")) { switch (split.trim()) { @@ -175,37 +197,154 @@ private ChangelogMode parseChangelogMode(String string) { return builder.build(); } - /** Managed {@link DynamicTableSink} for testing. */ - public static class TestManagedTableSink implements DynamicTableSink { - - @Override - public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - return requestedMode; + private static LinkedHashMap sortPartitionKey( + List partitionKeys, Map unresolvedPartitionSpec) { + LinkedHashMap partialResolvedPartitionSpec = new LinkedHashMap<>(); + for (String partitionKey : partitionKeys) { + partialResolvedPartitionSpec.put( + partitionKey, unresolvedPartitionSpec.get(partitionKey)); } + return partialResolvedPartitionSpec; + } - @Override - public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - throw new UnsupportedOperationException(); + private static Pattern buildPartitionRegex( + List partitionKeys, Map unresolvedPartitionSpec) { + StringBuilder regexBuilder = new StringBuilder(); + int index = 0; + for (String partitionKey : partitionKeys) { + String partitionValue = unresolvedPartitionSpec.get(partitionKey); + if (index > 0) { + regexBuilder.append(Path.SEPARATOR); + } + regexBuilder.append(partitionKey); + regexBuilder.append('='); + if (partitionValue != null) { + regexBuilder.append(partitionValue); + } else { + regexBuilder.append("([^/]+)"); + } + index++; } + return Pattern.compile(regexBuilder.toString()); + } - @Override - public DynamicTableSink copy() { - throw new UnsupportedOperationException(); - } + private static boolean compactedPartition(List partitionPaths) { + return partitionPaths != null + && partitionPaths.size() == 1 + && partitionPaths.get(0).getName().startsWith("compact-"); + } - @Override - public String asSummaryString() { - throw new UnsupportedOperationException(); + private static Optional resolveCompactFileBasePath(ObjectIdentifier tableIdentifier) { + AtomicReference>> reference = + MANAGED_TABLE_FILE_ENTRIES.get(tableIdentifier); + if (reference != null) { + Map> managedTableFileEntries = reference.get(); + for (Map.Entry> entry : + managedTableFileEntries.entrySet()) { + List partitionFiles = entry.getValue(); + if (partitionFiles.size() > 0) { + Path file = partitionFiles.get(0); + LinkedHashMap partitionSpec = + PartitionPathUtils.extractPartitionSpecFromPath(file); + if (partitionSpec.isEmpty()) { + return Optional.of(file.getParent().getPath()); + } else { + String tableName = tableIdentifier.asSummaryString(); + int index = file.getPath().indexOf(tableName); + if (index != -1) { + return Optional.of( + file.getPath().substring(0, index + tableName.length())); + } + } + } + } } + return Optional.empty(); + } - @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException(); + private Optional validateAndResolveCompactFileEntries( + ObjectIdentifier tableIdentifier, + List partitionKeys, + CatalogPartitionSpec unresolvedPartitionSpec) { + AtomicReference>> reference = + MANAGED_TABLE_FILE_ENTRIES.get(tableIdentifier); + if (reference != null) { + List resolvedPartitionSpecs = + validateAndResolvePartitionSpec( + reference.get(), partitionKeys, unresolvedPartitionSpec) + .stream() + .map(CatalogPartitionSpec::new) + .collect(Collectors.toList()); + + List partitionFilesToCompact = + resolvedPartitionSpecs.stream() + .map( + partitionSpec -> + Tuple2.of( + partitionSpec, + reference.get().get(partitionSpec))) + .filter(tuple -> !compactedPartition(tuple.f1)) + .map( + tuple -> + new TestManagedTableSource.CompactPartition( + new LinkedHashMap<>( + tuple.f0.getPartitionSpec()), + tuple.f1.stream() + .map(Path::getPath) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + return serializeCompactPartitions( + new TestManagedTableSource.CompactPartitions(partitionFilesToCompact)); } + return Optional.empty(); + } + + private static List> validateAndResolvePartitionSpec( + Map> partitionPaths, + List partitionKeys, + CatalogPartitionSpec catalogPartitionSpec) { + + List> allResolvedPartitionSpecs = + partitionPaths.keySet().stream() + .map( + catalogPartSpec -> + new LinkedHashMap<>(catalogPartSpec.getPartitionSpec())) + .collect(Collectors.toList()); - @Override - public int hashCode() { - throw new UnsupportedOperationException(); + Set> groundTruth = new HashSet<>(allResolvedPartitionSpecs); + + if (catalogPartitionSpec.getPartitionSpec().isEmpty()) { + // compact the whole table + return allResolvedPartitionSpecs; + } + Map unresolvedPartitionSpec = catalogPartitionSpec.getPartitionSpec(); + List> resolvedPartitionSpecs = new ArrayList<>(); + + if (unresolvedPartitionSpec.size() == partitionKeys.size()) { + LinkedHashMap partialResolvedPartitionSpec = + sortPartitionKey(partitionKeys, unresolvedPartitionSpec); + if (groundTruth.contains(partialResolvedPartitionSpec)) { + resolvedPartitionSpecs.add(partialResolvedPartitionSpec); + } else { + throw new ValidationException( + String.format( + "Cannot resolve partition spec %s", partialResolvedPartitionSpec)); + } + } else if (unresolvedPartitionSpec.size() < partitionKeys.size()) { + Pattern pattern = buildPartitionRegex(partitionKeys, unresolvedPartitionSpec); + for (LinkedHashMap resolvedPartitionSpec : allResolvedPartitionSpecs) { + Matcher matcher = + pattern.matcher( + PartitionPathUtils.generatePartitionPath(resolvedPartitionSpec)); + if (matcher.find()) { + resolvedPartitionSpecs.add(resolvedPartitionSpec); + } + } + if (resolvedPartitionSpecs.isEmpty()) { + throw new ValidationException( + String.format("Cannot resolve partition spec %s", unresolvedPartitionSpec)); + } } + return resolvedPartitionSpecs; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 761ef78eec4834..dbd7b5945baba6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -109,6 +109,7 @@ import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; import org.apache.flink.table.operations.ShowColumnsOperation; import org.apache.flink.table.operations.ShowCreateTableOperation; @@ -571,14 +572,18 @@ private Operation convertAlterTableReset( return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(newOptions)); } - private Operation convertAlterTableCompact( + /** + * Convert `ALTER TABLE ... COMPACT` operation to {@link ModifyOperation} for Flink's managed + * table to trigger a compaction batch job. + */ + private ModifyOperation convertAlterTableCompact( ObjectIdentifier tableIdentifier, ResolvedCatalogTable resolvedCatalogTable, SqlAlterTableCompact alterTableCompact) { Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null); if (ManagedTableListener.isManagedTable(catalog, resolvedCatalogTable)) { LinkedHashMap partitionKVs = alterTableCompact.getPartitionKVs(); - CatalogPartitionSpec partitionSpec = null; + CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(Collections.emptyMap()); if (partitionKVs != null) { List orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys(); Set validPartitionKeySet = new HashSet<>(orderedPartitionKeys); @@ -600,7 +605,18 @@ private Operation convertAlterTableCompact( }); partitionSpec = new CatalogPartitionSpec(partitionKVs); } - return new AlterTableCompactOperation(tableIdentifier, partitionSpec); + Map compactOptions = + catalogManager.resolveCompactManagedTableOptions( + resolvedCatalogTable, tableIdentifier, partitionSpec); + QueryOperation child = + new AlterTableCompactOperation( + tableIdentifier, resolvedCatalogTable, compactOptions); + return new CatalogSinkModifyOperation( + tableIdentifier, + child, + partitionSpec.getPartitionSpec(), + false, + compactOptions); } throw new ValidationException( String.format( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index 23c8be81b524cd..857291e3438d7d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -23,9 +23,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.connector.ChangelogMode; @@ -57,6 +59,7 @@ import org.apache.flink.table.operations.ValuesQueryOperation; import org.apache.flink.table.operations.WindowAggregateQueryOperation; import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow; +import org.apache.flink.table.operations.ddl.AlterTableCompactOperation; import org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; @@ -111,6 +114,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -342,6 +346,19 @@ private RelNode convertLegacyTableFunction( @Override public RelNode visit(CatalogQueryOperation catalogTable) { ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier(); + if (catalogTable instanceof AlterTableCompactOperation) { + if (!ShortcutUtils.unwrapContext(relBuilder).isBatchMode()) { + throw new ValidationException( + "Compact managed table only works under batch mode."); + } + ResolvedCatalogTable resolvedManagedTable = + ((AlterTableCompactOperation) catalogTable).getResolvedManagedTable(); + Map compactOptions = + ((AlterTableCompactOperation) catalogTable).getCompactOptions(); + return relBuilder + .compactScan(objectIdentifier, resolvedManagedTable, false, compactOptions) + .build(); + } return relBuilder .scan( objectIdentifier.getCatalogName(), diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala index fd74d218f069ef..d58836bbf60b77 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala @@ -23,7 +23,10 @@ import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, import org.apache.flink.table.planner.expressions.WindowProperty import org.apache.flink.table.planner.plan.QueryOperationConverter import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate} +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate import org.apache.flink.table.planner.plan.utils.AggregateUtil import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} @@ -32,19 +35,24 @@ import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ import org.apache.calcite.rel.RelCollation import org.apache.calcite.rel.`type`.RelDataTypeField +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalAggregate import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.SqlKind import org.apache.calcite.tools.RelBuilder.{AggCall, Config, GroupKey} import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory} import org.apache.calcite.util.{ImmutableBitSet, Util} +import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable} +import org.apache.flink.table.factories.FactoryUtil +import org.apache.flink.table.planner.connectors.DynamicSourceUtils +import org.apache.flink.table.planner.hint.FlinkHints +import org.apache.flink.table.planner.plan.stats.FlinkStatistic -import java.lang.Iterable import java.util import java.util.List import java.util.function.UnaryOperator -import scala.collection.JavaConversions._ +import _root_.scala.collection.JavaConverters._ /** * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. @@ -108,15 +116,15 @@ class FlinkRelBuilder( /** * Build non-window aggregate for either aggregate or table aggregate. */ - override def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = { + def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = { // build a relNode, the build() may also return a project - val relNode = super.aggregate(groupKey, aggCalls).build() + val relNode = super.aggregate(groupKey, aggCalls.asJava).build() def isCountStartAgg(agg: LogicalAggregate): Boolean = { if (agg.getGroupCount != 0 || agg.getAggCallList.size() != 1) { return false } - val call = agg.getAggCallList.head + val call = agg.getAggCallList.asScala.head call.getAggregation.getKind == SqlKind.COUNT && call.filterArg == -1 && call.getArgList.isEmpty } @@ -141,7 +149,7 @@ class FlinkRelBuilder( window: LogicalWindow, groupKey: GroupKey, namedProperties: List[NamedWindowProperty], - aggCalls: Iterable[AggCall]): RelBuilder = { + aggCalls: List[AggCall]): RelBuilder = { // build logical aggregate // Because of: @@ -165,8 +173,8 @@ class FlinkRelBuilder( aggregate match { case logicalAggregate: LogicalAggregate if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) => - push(LogicalWindowTableAggregate.create(window, namedProperties, aggregate)) - case _ => push(LogicalWindowAggregate.create(window, namedProperties, aggregate)) + push(LogicalWindowTableAggregate.create(window, namedProperties.asScala, aggregate)) + case _ => push(LogicalWindowAggregate.create(window, namedProperties.asScala, aggregate)) } } @@ -186,6 +194,44 @@ class FlinkRelBuilder( push(relNode) this } + + def compactScan( + identifier: ObjectIdentifier, + catalogTable: ResolvedCatalogTable, + isTemporary: Boolean, + compactOptions: util.Map[String, String]): RelBuilder = { + val flinkContext = context.unwrap(classOf[FlinkContext]) + val config = flinkContext.getTableConfig.getConfiguration + + val hints = new util.ArrayList[RelHint] + val mergedCatalogTable = if (!compactOptions.isEmpty) { + hints.add(RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(compactOptions).build) + catalogTable.copy(FlinkHints.mergeTableOptions(compactOptions, catalogTable.getOptions)) + } else { + catalogTable + } + val tableSource = + FactoryUtil. + createDynamicTableSource(null, + identifier, + mergedCatalogTable, + config, + Thread.currentThread().getContextClassLoader, + isTemporary) + push( + DynamicSourceUtils.convertSourceToRel( + true, + config, + this, + identifier, + mergedCatalogTable, + FlinkStatistic.UNKNOWN, + hints, + tableSource + ) + ) + this + } } object FlinkRelBuilder { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptClusterFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptClusterFactory.scala index 93e16c22a91226..6d3aa115540c7e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptClusterFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptClusterFactory.scala @@ -18,11 +18,12 @@ package org.apache.flink.table.planner.calcite -import org.apache.flink.table.planner.plan.metadata.{FlinkDefaultRelMetadataProvider, FlinkRelMetadataQuery} - +import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner} import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, RelMetadataQuery} import org.apache.calcite.rex.RexBuilder +import org.apache.flink.table.planner.hint.FlinkHintStrategies import java.util.function.Supplier @@ -38,6 +39,7 @@ object FlinkRelOptClusterFactory { cluster.setMetadataQuerySupplier(new Supplier[RelMetadataQuery]() { def get: FlinkRelMetadataQuery = FlinkRelMetadataQuery.instance() }) + cluster.setHintStrategies(FlinkHintStrategies.createHintStrategyTable()) cluster } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index b6e23de0559381..976c64aba9bb2e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -46,6 +46,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.factories.TestManagedTableFactory; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.BeginStatementSetOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; @@ -106,6 +107,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -1318,14 +1320,8 @@ public void testAlterTableCompactOnNonManagedTable() throws Exception { } @Test - public void testAlterTableCompact() throws Exception { + public void testAlterTableCompactOnManagedNonPartitionedTable() throws Exception { prepareManagedTable(false); - Operation operation = parse("alter table tb1 compact", SqlDialect.DEFAULT); - assertThat(operation).isInstanceOf(AlterTableCompactOperation.class); - AlterTableCompactOperation compactOperation = (AlterTableCompactOperation) operation; - - assertThat(compactOperation.asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT"); // specify partition on a non-partitioned table assertThatThrownBy( @@ -1341,39 +1337,14 @@ public void testAlterTableCompact() throws Exception { assertThatThrownBy(() -> parse("alter table tb2 compact", SqlDialect.DEFAULT)) .isInstanceOf(ValidationException.class) .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table."); + + checkAlterTableCompact( + parse("alter table tb1 compact", SqlDialect.DEFAULT), Collections.emptyMap()); } @Test - public void testAlterTableCompactPartition() throws Exception { + public void testAlterTableCompactOnManagedPartitionedTable() throws Exception { prepareManagedTable(true); - - // compact partitioned table without partition_spec - assertThat(parse("alter table tb1 compact", SqlDialect.DEFAULT).asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 COMPACT"); - - // compact partitioned table without full partition_spec - assertThat( - parse("alter table tb1 partition (b=1) compact", SqlDialect.DEFAULT) - .asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1) COMPACT"); - - assertThat( - parse("alter table tb1 partition (c=2) compact", SqlDialect.DEFAULT) - .asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2) COMPACT"); - - // compact partitioned table with full partition_spec - assertThat( - parse("alter table tb1 partition (b=1,c=2) compact", SqlDialect.DEFAULT) - .asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (b=1, c=2) COMPACT"); - - // compact partitioned table with disordered partition_spec - assertThat( - parse("alter table tb1 partition (c=2,b=1) compact", SqlDialect.DEFAULT) - .asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 PARTITION (c=2, b=1) COMPACT"); - // compact partitioned table with a non-existed partition_spec assertThatThrownBy( () -> @@ -1383,6 +1354,31 @@ public void testAlterTableCompactPartition() throws Exception { .isInstanceOf(ValidationException.class) .hasMessage( "Partition column 'dt' not defined in the table schema. Available ordered partition columns: ['b', 'c']"); + + // compact partitioned table with full partition spec + Map staticPartitions = new HashMap<>(); + staticPartitions.put("b", "0"); + staticPartitions.put("c", "flink"); + checkAlterTableCompact( + parse("alter table tb1 partition (b = 0, c = 'flink') compact", SqlDialect.DEFAULT), + staticPartitions); + + // compact partitioned table with subordinate partition spec + staticPartitions = Collections.singletonMap("b", "0"); + checkAlterTableCompact( + parse("alter table tb1 partition (b = 0) compact", SqlDialect.DEFAULT), + staticPartitions); + + // compact partitioned table with secondary partition spec + staticPartitions = Collections.singletonMap("c", "flink"); + checkAlterTableCompact( + parse("alter table tb1 partition (c = 'flink') compact", SqlDialect.DEFAULT), + staticPartitions); + + // compact partitioned table without partition spec + staticPartitions = Collections.emptyMap(); + checkAlterTableCompact( + parse("alter table tb1 compact", SqlDialect.DEFAULT), staticPartitions); } @Test @@ -1716,6 +1712,8 @@ private void prepareNonManagedTable(boolean hasConstraint) throws Exception { } private void prepareManagedTable(boolean hasPartition) throws Exception { + TestManagedTableFactory.MANAGED_TABLES.put( + ObjectIdentifier.of("cat1", "db1", "tb1"), new AtomicReference<>()); prepareTable(true, hasPartition, false); } @@ -1744,8 +1742,8 @@ private void prepareTable(boolean managedTable, boolean hasPartition, boolean ha Collections.unmodifiableMap(options)); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); - ObjectPath tablePath = new ObjectPath("db1", "tb1"); - catalog.createTable(tablePath, catalogTable, true); + ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1"); + catalogManager.createTable(catalogTable, tableIdentifier, true); } private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) { @@ -1759,6 +1757,28 @@ private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) { return plannerContext.createCalciteParser(); } + private void checkAlterTableCompact(Operation operation, Map staticPartitions) { + assertThat(operation).isInstanceOf(CatalogSinkModifyOperation.class); + CatalogSinkModifyOperation modifyOperation = (CatalogSinkModifyOperation) operation; + assertThat(modifyOperation.getStaticPartitions()) + .containsExactlyInAnyOrderEntriesOf(staticPartitions); + assertThat(modifyOperation.isOverwrite()).isFalse(); + assertThat(modifyOperation.getDynamicOptions()) + .containsEntry( + TestManagedTableFactory.ENRICHED_KEY, + TestManagedTableFactory.ENRICHED_VALUE); + assertThat(modifyOperation.getTableIdentifier()) + .isEqualTo(ObjectIdentifier.of("cat1", "db1", "tb1")); + assertThat(modifyOperation.getChild()).isInstanceOf(AlterTableCompactOperation.class); + AlterTableCompactOperation child = (AlterTableCompactOperation) modifyOperation.getChild(); + assertThat(child.getChildren()).isEmpty(); + assertThat(child.getCompactOptions()).containsEntry("k", "v"); + assertThat(child.getCompactOptions()) + .containsEntry( + TestManagedTableFactory.ENRICHED_KEY, + TestManagedTableFactory.ENRICHED_VALUE); + } + // ~ Inner Classes ---------------------------------------------------------- private static class TestItem { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java new file mode 100644 index 00000000000000..16de226091405a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.utils.PartitionPathUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLES; +import static org.apache.flink.table.factories.TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; + +/** IT Case for testing managed table compaction. */ +public class CompactManagedTableITCase extends BatchTestBase { + + private final ObjectIdentifier tableIdentifier = + ObjectIdentifier.of(tEnv().getCurrentCatalog(), tEnv().getCurrentDatabase(), "MyTable"); + private final Map> collectedElements = new HashMap<>(); + + private Path rootPath; + private AtomicReference>> + referenceOfManagedTableFileEntries; + + @Override + @Before + public void before() { + super.before(); + MANAGED_TABLES.put(tableIdentifier, new AtomicReference<>()); + referenceOfManagedTableFileEntries = new AtomicReference<>(); + MANAGED_TABLE_FILE_ENTRIES.put(tableIdentifier, referenceOfManagedTableFileEntries); + try { + rootPath = + new Path( + new Path(TEMPORARY_FOLDER.newFolder().getPath()), + tableIdentifier.asSummaryString()); + rootPath.getFileSystem().mkdirs(rootPath); + } catch (IOException e) { + fail(String.format("Failed to create dir for %s", rootPath), e); + } + } + + @Override + @After + public void after() { + super.after(); + tEnv().executeSql("DROP TABLE MyTable"); + collectedElements.clear(); + try { + rootPath.getFileSystem().delete(rootPath, true); + } catch (IOException e) { + fail(String.format("Failed to delete dir for %s", rootPath), e); + } + } + + @Test + public void testCompactPartitionOnNonPartitionedTable() { + String sql = "CREATE TABLE MyTable (id BIGINT, content STRING)"; + tEnv().executeSql(sql); + assertThatThrownBy( + () -> + tEnv().executeSql( + "ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format("Table %s is not partitioned.", tableIdentifier)); + } + + @Test + public void testCompactPartitionOnNonExistedPartitionKey() { + String sql = + "CREATE TABLE MyTable (\n" + + " id BIGINT,\n" + + " content STRING,\n" + + " season STRING\n" + + ") PARTITIONED BY (season)"; + tEnv().executeSql(sql); + assertThatThrownBy( + () -> + tEnv().executeSql( + "ALTER TABLE MyTable PARTITION (saeson = 'summer') COMPACT")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Partition column 'saeson' not defined in the table schema. " + + "Available ordered partition columns: ['season']"); + } + + @Test + public void testCompactPartitionOnNonExistedPartitionValue() throws Exception { + String sql = + "CREATE TABLE MyTable (\n" + + " id BIGINT,\n" + + " content STRING,\n" + + " season STRING\n" + + ") PARTITIONED BY (season)"; + prepare(sql, Collections.singletonList(of("season", "'spring'"))); + assertThatThrownBy( + () -> + tEnv().executeSql( + "ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Cannot resolve partition spec {season=summer}"); + } + + @Test + public void testCompactNonPartitionedTable() throws Exception { + String sql = "CREATE TABLE MyTable (id BIGINT, content STRING)"; + prepare(sql, Collections.emptyList()); + + // test compact table + CatalogPartitionSpec unresolvedDummySpec = new CatalogPartitionSpec(Collections.emptyMap()); + Set resolvedPartitionSpecsHaveBeenOrToBeCompacted = + Collections.singleton(unresolvedDummySpec); + executeAndCheck(unresolvedDummySpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + } + + @Test + public void testCompactSinglePartitionedTable() throws Exception { + String sql = + "CREATE TABLE MyTable (\n" + + " id BIGINT,\n" + + " content STRING,\n" + + " season STRING\n" + + ") PARTITIONED BY (season)"; + prepare(sql, Arrays.asList(of("season", "'spring'"), of("season", "'summer'"))); + + Set resolvedPartitionSpecsHaveBeenOrToBeCompacted = new HashSet<>(); + + // test compact one partition + CatalogPartitionSpec unresolvedPartitionSpec = + new CatalogPartitionSpec(of("season", "'summer'")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "summer"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact the whole table + unresolvedPartitionSpec = new CatalogPartitionSpec(Collections.emptyMap()); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "spring"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + } + + @Test + public void testCompactMultiPartitionedTable() throws Exception { + String sql = + "CREATE TABLE MyTable (" + + " id BIGINT,\n" + + " content STRING,\n" + + " season STRING,\n" + + " `month` INT\n" + + ") PARTITIONED BY (season, `month`)"; + + prepare( + sql, + Arrays.asList( + // spring + of("season", "'spring'", "`month`", "2"), + of("season", "'spring'", "`month`", "3"), + of("season", "'spring'", "`month`", "4"), + // summer + of("season", "'summer'", "`month`", "5"), + of("season", "'summer'", "`month`", "6"), + of("season", "'summer'", "`month`", "7"), + of("season", "'summer'", "`month`", "8"), + // autumn + of("season", "'autumn'", "`month`", "8"), + of("season", "'autumn'", "`month`", "9"), + of("season", "'autumn'", "`month`", "10"), + // winter + of("season", "'winter'", "`month`", "11"), + of("season", "'winter'", "`month`", "12"), + of("season", "'winter'", "`month`", "1"))); + + Set resolvedPartitionSpecsHaveBeenOrToBeCompacted = new HashSet<>(); + + // test compact one partition with full ordered partition spec + CatalogPartitionSpec unresolvedPartitionSpec = + new CatalogPartitionSpec(of("season", "'spring'", "`month`", "2")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "spring", "month", "2"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact one partition with full but disordered partition spec + unresolvedPartitionSpec = + new CatalogPartitionSpec(of("`month`", "3", "season", "'spring'")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "spring", "month", "3"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact multiple partitions with the subordinate partition spec + unresolvedPartitionSpec = new CatalogPartitionSpec(of("season", "'winter'")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "winter", "month", "1"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "winter", "month", "11"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "winter", "month", "12"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact one partition with the secondary partition spec + unresolvedPartitionSpec = new CatalogPartitionSpec(of("`month`", "5")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "summer", "month", "5"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact multiple partitions with the secondary partition spec + unresolvedPartitionSpec = new CatalogPartitionSpec(of("`month`", "8")); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "summer", "month", "8"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "autumn", "month", "8"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // test compact the whole table + unresolvedPartitionSpec = new CatalogPartitionSpec(Collections.emptyMap()); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "spring", "month", "4"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "summer", "month", "6"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "summer", "month", "7"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "autumn", "month", "9"))); + resolvedPartitionSpecsHaveBeenOrToBeCompacted.add( + new CatalogPartitionSpec(of("season", "autumn", "month", "10"))); + executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted); + } + + // ~ Tools ------------------------------------------------------------------ + + private void prepare(String managedTableDDL, List> partitionKVs) + throws Exception { + prepareMirrorTables(managedTableDDL); + prepareFileEntries(partitionKVs); + scanFileEntries(); + } + + private void prepareMirrorTables(String managedTableDDL) { + tEnv().executeSql(managedTableDDL); + String helperSource = + "CREATE TABLE HelperSource (id BIGINT, content STRING ) WITH (" + + " 'connector' = 'datagen', " + + " 'rows-per-second' = '5', " + + " 'fields.id.kind' = 'sequence', " + + " 'fields.id.start' = '0', " + + " 'fields.id.end' = '200', " + + " 'fields.content.kind' = 'random', " + + " 'number-of-rows' = '50')"; + String helperSink = + String.format( + "CREATE TABLE HelperSink WITH (" + + " 'connector' = 'filesystem', " + + " 'format' = 'testcsv', " + + " 'path' = '%s' )" + + "LIKE MyTable (EXCLUDING OPTIONS)", + rootPath.getPath()); + + tEnv().executeSql(helperSource); + tEnv().executeSql(helperSink); + } + + private void prepareFileEntries(List> partitionKVs) + throws Exception { + tEnv().executeSql(prepareInsertDML(partitionKVs)).await(); + } + + private static String prepareInsertDML(List> partitionKVs) { + StringBuilder dmlBuilder = new StringBuilder("INSERT INTO HelperSink\n"); + if (partitionKVs.isEmpty()) { + return dmlBuilder.append("SELECT id,\n content\nFROM HelperSource\n").toString(); + } + + for (int i = 0; i < partitionKVs.size(); i++) { + dmlBuilder.append("SELECT id,\n content,\n"); + int j = 0; + for (Map.Entry entry : partitionKVs.get(i).entrySet()) { + dmlBuilder.append(" "); + dmlBuilder.append(entry.getValue()); + dmlBuilder.append(" AS "); + dmlBuilder.append(entry.getKey()); + if (j < partitionKVs.get(i).size() - 1) { + dmlBuilder.append(",\n"); + } else { + dmlBuilder.append("\n"); + } + j++; + } + dmlBuilder.append("FROM HelperSource\n"); + if (i < partitionKVs.size() - 1) { + dmlBuilder.append("UNION ALL\n"); + } + } + return dmlBuilder.toString(); + } + + private void scanFileEntries() throws IOException { + Map> managedTableFileEntries = new HashMap<>(); + try (Stream pathStream = Files.walk(Paths.get(rootPath.getPath()))) { + pathStream + .filter(Files::isRegularFile) + .forEach( + filePath -> { + Path file = new Path(filePath.toString()); + CatalogPartitionSpec partitionSpec = + new CatalogPartitionSpec( + PartitionPathUtils.extractPartitionSpecFromPath( + file)); + // for non-partitioned table, the map is empty + List fileEntries = + managedTableFileEntries.getOrDefault( + partitionSpec, new ArrayList<>()); + fileEntries.add(file); + managedTableFileEntries.put(partitionSpec, fileEntries); + + List elements = + collectedElements.getOrDefault( + partitionSpec, new ArrayList<>()); + elements.addAll(readElementsFromFile(filePath.toFile())); + collectedElements.put(partitionSpec, elements); + }); + } + referenceOfManagedTableFileEntries.set(managedTableFileEntries); + } + + private static List readElementsFromFile(File file) { + List elements = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(file))) { + String line; + while ((line = reader.readLine()) != null) { + elements.add(GenericRowData.of(StringData.fromString(line))); + } + } catch (IOException e) { + fail("This should not happen"); + } + return elements; + } + + private LinkedHashMap of(String... kvs) { + assert kvs != null && kvs.length % 2 == 0; + LinkedHashMap map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length - 1; i += 2) { + map.put(kvs[i], kvs[i + 1]); + } + return map; + } + + private static String prepareCompactSql(CatalogPartitionSpec unresolvedCompactPartitionSpec) { + String compactSqlTemplate = "ALTER TABLE MyTable%s COMPACT"; + Map partitionKVs = unresolvedCompactPartitionSpec.getPartitionSpec(); + StringBuilder sb = new StringBuilder(); + int index = 0; + for (Map.Entry entry : partitionKVs.entrySet()) { + if (index == 0) { + sb.append(" PARTITION ("); + } + sb.append(entry.getKey()); + sb.append(" = "); + sb.append(entry.getValue()); + if (index < partitionKVs.size() - 1) { + sb.append(", "); + } + if (index == partitionKVs.size() - 1) { + sb.append(")"); + } + index++; + } + return String.format(compactSqlTemplate, sb); + } + + private void executeAndCheck( + CatalogPartitionSpec unresolvedPartitionSpec, + Set resolvedPartitionSpecsHaveBeenOrToBeCompacted) + throws ExecutionException, InterruptedException { + String compactSql = prepareCompactSql(unresolvedPartitionSpec); + + // first run to check compacted file size and content + tEnv().executeSql(compactSql).await(); + Map firstRun = + checkFileAndElements(resolvedPartitionSpecsHaveBeenOrToBeCompacted); + + // second run to check idempotence + tEnv().executeSql(compactSql).await(); + Map secondRun = + checkFileAndElements(resolvedPartitionSpecsHaveBeenOrToBeCompacted); + checkModifiedTime(firstRun, secondRun); + } + + private Map checkFileAndElements( + Set resolvedPartitionSpecsHaveBeenOrToBeCompacted) { + Map lastModifiedForEachPartition = new HashMap<>(); + Map> managedTableFileEntries = + referenceOfManagedTableFileEntries.get(); + managedTableFileEntries.forEach( + (partitionSpec, fileEntries) -> { + if (resolvedPartitionSpecsHaveBeenOrToBeCompacted.contains(partitionSpec)) { + assertThat(fileEntries).hasSize(1); + Path compactedFile = fileEntries.get(0); + assertThat(compactedFile.getName()).startsWith("compact-"); + List compactedElements = + readElementsFromFile(new File(compactedFile.getPath())); + assertThat(compactedElements) + .hasSameElementsAs(collectedElements.get(partitionSpec)); + lastModifiedForEachPartition.put( + partitionSpec, getLastModifiedTime(compactedFile)); + } else { + // check remaining partitions are untouched + fileEntries.forEach( + file -> { + assertThat(file.getName()).startsWith("part-"); + List elements = + readElementsFromFile(new File(file.getPath())); + assertThat(collectedElements.get(partitionSpec)) + .containsAll(elements); + }); + } + }); + return lastModifiedForEachPartition; + } + + private void checkModifiedTime( + Map firstRun, Map secondRun) { + firstRun.forEach( + (partitionSpec, lastModified) -> + assertThat(secondRun.get(partitionSpec)) + .isEqualTo(lastModified) + .isNotEqualTo(-1L)); + } + + private static long getLastModifiedTime(Path compactedFile) { + try { + FileStatus status = compactedFile.getFileSystem().getFileStatus(compactedFile); + return status.getModificationTime(); + } catch (IOException e) { + fail("This should not happen"); + } + return -1L; + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 7386413a5d2e5a..e96e22c6e58f6d 100644 --- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.formats.testcsv.TestCsvFormatFactory -org.apache.flink.table.planner.factories.TestValuesTableFactory -org.apache.flink.table.planner.factories.TestFileFactory +org.apache.flink.table.factories.TestManagedTableFactory org.apache.flink.table.planner.factories.TableFactoryHarness$Factory +org.apache.flink.table.planner.factories.TestFileFactory +org.apache.flink.table.planner.factories.TestValuesTableFactory org.apache.flink.table.planner.plan.stream.sql.TestTableFactory +org.apache.flink.formats.testcsv.TestCsvFormatFactory diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.xml new file mode 100644 index 00000000000000..f3e7d5fb4d2669 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.xml @@ -0,0 +1,104 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 2dc12c09e6c0fe..c12111f4008489 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.Types.STRING import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration +import org.apache.flink.core.testutils.FlinkMatchers.containsCause import org.apache.flink.streaming.api.environment.LocalStreamEnvironment import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _} @@ -30,7 +31,9 @@ import org.apache.flink.table.module.ModuleEntry import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory._ import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction -import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId} +import org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator +import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId +import org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks} import org.apache.flink.table.types.DataType import org.apache.flink.types.Row @@ -341,7 +344,7 @@ class TableEnvironmentTest { } @Test - def testAlterTableCompactOnManagedTable(): Unit = { + def testAlterTableCompactOnManagedTableUnderStreamingMode(): Unit = { val statement = """ |CREATE TABLE MyTable ( @@ -352,8 +355,10 @@ class TableEnvironmentTest { """.stripMargin tableEnv.executeSql(statement) - assertEquals(ResultKind.SUCCESS, - tableEnv.executeSql("ALTER TABLE MyTable COMPACT").getResultKind) + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "Compact managed table only works under batch mode.") + tableEnv.executeSql("alter table MyTable compact") } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.scala new file mode 100644 index 00000000000000..0ccc86c5bd24a8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CompactManagedTableTest.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql + +import org.apache.flink.core.fs.Path +import org.apache.flink.table.catalog.{CatalogPartitionSpec, ObjectIdentifier} +import org.apache.flink.table.factories.TestManagedTableFactory +import org.apache.flink.table.planner.utils.TableTestBase +import org.junit.{After, Before, Test} + +import java.util +import java.util.Collections +import java.util.concurrent.atomic.AtomicReference + +class CompactManagedTableTest extends TableTestBase { + + private val tableIdentifier = + ObjectIdentifier.of("default_catalog", "default_database", "ManagedTable") + private val testUtil = batchTestUtil() + + @Before + def before(): Unit = { + val tableRef = new AtomicReference[util.Map[String, String]] + TestManagedTableFactory.MANAGED_TABLES.put( + tableIdentifier, tableRef) + val ddl = + """ + |CREATE TABLE ManagedTable ( + | a BIGINT, + | b INT, + | c VARCHAR + |) PARTITIONED BY (b, c) + """.stripMargin + testUtil.tableEnv.executeSql(ddl) + + val partitionKVs = new util.LinkedHashMap[String, String] + partitionKVs.put("b", "0") + partitionKVs.put("c", "flink") + val managedTableFileEntries = new util.HashMap[CatalogPartitionSpec, util.List[Path]]() + managedTableFileEntries.put( + new CatalogPartitionSpec(partitionKVs), + Collections.singletonList(new Path("/foo/bar/file"))) + val fileRef = new AtomicReference[util.Map[CatalogPartitionSpec, util.List[Path]]] + fileRef.set(managedTableFileEntries) + TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.put(tableIdentifier, fileRef) + } + + @After + def after(): Unit = { + val ddl = "DROP TABLE ManagedTable" + testUtil.tableEnv.executeSql(ddl) + TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.remove(tableIdentifier) + } + + @Test + def testExplainAlterTableCompactWithResolvedPartitionSpec(): Unit = { + val sql = "ALTER TABLE ManagedTable PARTITION (b = 0, c = 'flink') COMPACT" + testUtil.verifyExplainCompact(sql) + } + + @Test + def testExplainAlterTableCompactWithUnorderedPartitionSpec(): Unit = { + val sql = "ALTER TABLE ManagedTable PARTITION (c = 'flink', b = 0) COMPACT" + testUtil.verifyExplainCompact(sql) + } + + @Test + def testExplainAlterTableCompactWithoutSubordinatePartitionSpec(): Unit = { + val sql = "ALTER TABLE ManagedTable PARTITION (b = 0) COMPACT" + testUtil.verifyExplainCompact(sql) + } + + @Test + def testExplainAlterTableCompactWithoutSecondaryPartitionSpec(): Unit = { + val sql = "ALTER TABLE ManagedTable PARTITION (c = 'flink') COMPACT" + testUtil.verifyExplainCompact(sql) + } + + @Test + def testExplainAlterTableCompactWithoutPartitionSpec(): Unit = { + val sql = "ALTER TABLE ManagedTable COMPACT" + testUtil.verifyExplainCompact(sql) + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index c88189e398070c..db3eb8e598853a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -48,7 +48,7 @@ import org.apache.flink.table.functions._ import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} import org.apache.flink.table.planner.calcite.CalciteConfig -import org.apache.flink.table.planner.delegation.PlannerBase +import org.apache.flink.table.planner.delegation.{DefaultExecutor, PlannerBase} import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.operations.{InternalDataStreamQueryOperation, PlannerQueryOperation, RichTableSourceQueryOperation} import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner @@ -698,6 +698,23 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) verifyExplain(statSet, extraDetails: _*) } + /** + * Verify the explain result for the given ALTER TABLE COMPACT clause. + * The explain result will contain the extra [[ExplainDetail]]s. + */ + def verifyExplainCompact(compact: String): Unit = { + val operations = getTableEnv.asInstanceOf[TableEnvironmentImpl].getParser.parse(compact) + val relNode = TableTestUtil.toRelNode( + getTableEnv, + operations.get(0).asInstanceOf[ModifyOperation]) + assertPlanEquals( + Array(relNode), + Array.empty[ExplainDetail], + withRowType = false, + Array(PlanKind.AST, PlanKind.OPT_REL), + () => assertEqualsOrExpand("sql", compact)) + } + /** * Verify the explain result for the given [[Table]]. See more about [[Table#explain()]]. */ @@ -1619,7 +1636,7 @@ object TableTestUtil { val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build() /** - * Converts operation tree in the given table to a RelNode tree. + * Convert operation tree in the given table to a RelNode tree. */ def toRelNode(table: Table): RelNode = { table.asInstanceOf[TableImpl] @@ -1628,6 +1645,16 @@ object TableTestUtil { .getRelBuilder.queryOperation(table.getQueryOperation).build() } + /** + * Convert modify operation to a RelNode tree. + */ + def toRelNode( + tEnv: TableEnvironment, + modifyOperation: ModifyOperation): RelNode = { + val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase] + planner.translateToRel(modifyOperation) + } + def createTemporaryView[T]( tEnv: TableEnvironment, name: String,