Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterTable"
"org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
"org.apache.flink.sql.parser.ddl.SqlAlterTableCompact"
"org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint"
"org.apache.flink.sql.parser.ddl.SqlAlterTableOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
Expand Down Expand Up @@ -110,6 +111,7 @@
"CATALOGS"
"CHANGELOG_MODE"
"COMMENT"
"COMPACT"
"COLUMNS"
"DATABASES"
"ENFORCED"
Expand Down Expand Up @@ -184,6 +186,7 @@
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
"COMPACT"
"CONDITIONAL"
"CONDITION_NUMBER"
"CONNECTION"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ SqlAlterTable SqlAlterTable() :
SqlIdentifier newTableIdentifier = null;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
SqlNodeList partitionSpec = null;
SqlIdentifier constraintName;
SqlTableConstraint constraint;
}
Expand Down Expand Up @@ -556,6 +557,17 @@ SqlAlterTable SqlAlterTable() :
constraintName,
startPos.plus(getPos()));
}
|
[
<PARTITION>
{ partitionSpec = new SqlNodeList(getPos());
PartitionSpecCommaList(partitionSpec);
}
]
<COMPACT>
{
return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec);
}
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;

import java.util.List;

/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */
public class SqlAlterTableCompact extends SqlAlterTable {

public SqlAlterTableCompact(
SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
super(pos, tableName, partitionSpec);
}

public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) {
super(pos, tableName);
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableIdentifier, partitionSpec);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("COMPACT");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,21 @@ public void testAlterTableReset() {
sql("alter table t1 reset()").ok("ALTER TABLE `T1` RESET (\n)");
}

@Test
public void testAlterTableCompact() {
sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT");

sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT");

sql("alter table cat1.db1.t1 compact").ok("ALTER TABLE `CAT1`.`DB1`.`T1` COMPACT");

sql("alter table t1 partition(x='y',m='n') compact")
.ok("ALTER TABLE `T1` PARTITION (`X` = 'y', `M` = 'n') COMPACT");

sql("alter table t1 partition(^)^ compact")
.fails("(?s).*Encountered \"\\)\" at line 1, column 26.\n.*");
}

@Test
public void testCreateTable() {
final String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
Expand Down Expand Up @@ -998,6 +999,8 @@ public TableResultInternal executeInternal(Operation operation) {
for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
catalog.dropPartition(tablePath, spec, ifExists);
}
} else if (alterTableOperation instanceof AlterTableCompactOperation) {
// TODO: FLINK-25176 work with managed table
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public void notifyTableDrop(
}
}

private boolean isManagedTable(@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
/** Check a resolved catalog table is Flink's managed table or not. */
public static boolean isManagedTable(
@Nullable Catalog catalog, ResolvedCatalogBaseTable<?> table) {
if (catalog == null || !catalog.supportsManagedTable()) {
// catalog not support managed table
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public String getCatalogName() {

@Override
public String asSummaryString() {
return String.format("USE CATALOGS %s", catalogName);
return String.format("USE CATALOG %s", catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.OperationUtils;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */
public class AlterTableCompactOperation extends AlterTableOperation {

private final CatalogPartitionSpec partitionSpec;

public AlterTableCompactOperation(
ObjectIdentifier tableIdentifier, @Nullable CatalogPartitionSpec partitionSpec) {
super(tableIdentifier);
this.partitionSpec = partitionSpec;
}

public Map<String, String> getPartitionSpec() {
return partitionSpec == null
? Collections.emptyMap()
: new LinkedHashMap<>(partitionSpec.getPartitionSpec());
}

@Override
public String asSummaryString() {
String spec =
partitionSpec == null
? ""
: String.format(
"PARTITION (%s) ",
OperationUtils.formatPartitionSpec(partitionSpec));
return String.format("ALTER TABLE %s %sCOMPACT", tableIdentifier.asSummaryString(), spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public String getConstraintName() {

@Override
public String asSummaryString() {
return String.format("ALTER TABLE %s DROP CONSTRAINT %s", tableIdentifier, constraintName);
return String.format(
"ALTER TABLE %s DROP CONSTRAINT %s",
tableIdentifier.asSummaryString(), constraintName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
import org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
Expand Down Expand Up @@ -91,10 +92,13 @@
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ManagedTableListener;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
Expand Down Expand Up @@ -129,6 +133,7 @@
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableCompactOperation;
import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
Expand Down Expand Up @@ -169,6 +174,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -392,8 +398,7 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
throw new ValidationException(
String.format(
"Table %s doesn't exist or is a temporary table.",
tableIdentifier.toString()));
"Table %s doesn't exist or is a temporary table.", tableIdentifier));
}
CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
if (baseTable instanceof CatalogView) {
Expand Down Expand Up @@ -485,6 +490,11 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
specs.add(new CatalogPartitionSpec(dropPartitions.getPartitionKVs(i)));
}
return new DropPartitionsOperation(tableIdentifier, dropPartitions.ifExists(), specs);
} else if (sqlAlterTable instanceof SqlAlterTableCompact) {
ResolvedCatalogTable resolvedCatalogTable =
(ResolvedCatalogTable) optionalCatalogTable.get().getResolvedTable();
return convertAlterTableCompact(
tableIdentifier, resolvedCatalogTable, (SqlAlterTableCompact) sqlAlterTable);
} else {
throw new ValidationException(
String.format(
Expand Down Expand Up @@ -532,16 +542,57 @@ private Operation convertAlterTableReset(
CatalogTable oldTable,
SqlAlterTableReset alterTableReset) {
Map<String, String> newOptions = new HashMap<>(oldTable.getOptions());
// reset empty key is not allowed
// reset empty or 'connector' key is not allowed
Set<String> resetKeys = alterTableReset.getResetKeys();
if (resetKeys.isEmpty()) {
throw new ValidationException("ALTER TABLE RESET does not support empty key");
if (resetKeys.isEmpty() || resetKeys.contains(FactoryUtil.CONNECTOR.key())) {
String exMsg =
resetKeys.isEmpty()
? "ALTER TABLE RESET does not support empty key"
: "ALTER TABLE RESET does not support changing 'connector'";
throw new ValidationException(exMsg);
}
// reset table option keys
resetKeys.forEach(newOptions::remove);
return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(newOptions));
}

private Operation convertAlterTableCompact(
ObjectIdentifier tableIdentifier,
ResolvedCatalogTable resolvedCatalogTable,
SqlAlterTableCompact alterTableCompact) {
Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
if (ManagedTableListener.isManagedTable(catalog, resolvedCatalogTable)) {
LinkedHashMap<String, String> partitionKVs = alterTableCompact.getPartitionKVs();
CatalogPartitionSpec partitionSpec = null;
if (partitionKVs != null) {
List<String> orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys();
Set<String> validPartitionKeySet = new HashSet<>(orderedPartitionKeys);
String exMsg =
orderedPartitionKeys.isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, maybe it doesn't have to be ordered, just hit the specified partition is OK.
We can just do something like DynamicSinkUtils.validatePartitioning

? String.format("Table %s is not partitioned.", tableIdentifier)
: String.format(
"Available ordered partition columns: [%s]",
orderedPartitionKeys.stream()
.collect(Collectors.joining("', '", "'", "'")));
partitionKVs.forEach(
(partitionKey, partitionValue) -> {
if (!validPartitionKeySet.contains(partitionKey)) {
throw new ValidationException(
String.format(
"Partition column '%s' not defined in the table schema. %s",
partitionKey, exMsg));
}
});
partitionSpec = new CatalogPartitionSpec(partitionKVs);
}
return new AlterTableCompactOperation(tableIdentifier, partitionSpec);
}
throw new ValidationException(
String.format(
"ALTER TABLE COMPACT operation is not supported for non-managed table %s",
tableIdentifier));
}

/** Convert CREATE FUNCTION statement. */
private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
UnresolvedIdentifier unresolvedIdentifier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -33,7 +32,7 @@

/**
* A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the partition
* to/from JSON, but also can write partitioned data for {@link SupportsWritingMetadata}.
* to/from JSON, but also can write partitioned data for {@link SupportsPartitioning}.
*/
@JsonTypeName("Partitioning")
public class PartitioningSpec implements SinkAbilitySpec {
Expand Down
Loading