New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25520][Table SQL/API] Implement "ALTER TABLE ... COMPACT" SQL #18394
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 6d75de0 (Tue Jan 18 11:28:32 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
b8d38ca
to
455dfd8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @LadyForest for your contribution! Left some comments.
val flinkContext = context.unwrap(classOf[FlinkContext]) | ||
val config = flinkContext.getTableConfig.getConfiguration | ||
|
||
val hints = new util.ArrayList[RelHint] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just use CatalogSourceTable
from FlinkCalciteCatalogReader
? (You can refer to RelBuilder
) Then you can create a ToRelContext
for it.
@@ -186,6 +195,44 @@ class FlinkRelBuilder( | |||
push(relNode) | |||
this | |||
} | |||
|
|||
def compactScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just scan(ObjectIdentifier objectIdentifier, Map<String, String> dynamicOptions)
?
|
||
private final CatalogPartitionSpec partitionSpec; | ||
private final ResolvedCatalogTable resolvedManagedTable; | ||
private final Map<String, String> compactOptions; | ||
|
||
public AlterTableCompactOperation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can drop this class, just add dynamicOptions
to CatalogQueryOperation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, how can we tell the normal CatalogQueryOperation
from ALTER TABLE COMPACT
during the rel conversion inQueryOperationConverter
(e.g. check isBatch
condition and inject dynamic options)?
/** | ||
* Notifies the listener that a table compaction occurred. | ||
* | ||
* @return dynamic options of the file entries under compaction for this table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamic options for source and sink? there is no file concept in flink-sql
flink-formats/flink-orc/pom.xml
Outdated
@@ -145,6 +145,19 @@ under the License. | |||
<scope>test</scope> | |||
<type>test-jar</type> | |||
</dependency> | |||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why modify orc parquet python sql-client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestManagedFactory
can be removed from org.apache.flink.table.factories.Factory
file under the table-planner
module to avoid adding a test-jar dependency. But flink-orc
and flink-parquet
cannot eliminate guava
test dependency, o.w. there will be a NoClassDefFoundError
thrown (this transitive dependency is introduced by Calcite)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestManagedFactory can be removed from org.apache.flink.table.factories.Factory file under the table-planner module to avoid adding a test-jar dependency.
Where? Can you clarify where the changes led here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where? Can you clarify where the changes led here?
The following dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
is added to flink-orc
, flink-parquet
, flink-python
and flink-sql-client
in commit c53915a62cb7cf0cd70a6256a0d0a47dcd57a2a6
and removed in commit 3c2ac33e07cb54f2bb26df4878a8f5b93e7ad8a7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean com.google.guava
455dfd8
to
cc5eaf1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just feel, the testing classes are too complicate, can we simplify them? We only need to test what we need, there is no need to have very strong features.
private static final long serialVersionUID = 1L; | ||
|
||
private final Map<CatalogPartitionSpec, List<RowData>> toAdd; | ||
private final Map<CatalogPartitionSpec, Set<Path>> toDelete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogPartitionSpec
is not Serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogPartitionSpec
is notSerializable
TestManagedSinkCommittableSerializer
takes care of CatalogPartitionSpec
|
||
private final ObjectIdentifier tableIdentifier; | ||
private final Path basePath; | ||
private final transient RowDataEncoder encoder = new RowDataEncoder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dom't add transient
in non-ser class
I've made some efforts to simplify the implementation. However, it's a little hard for us to avoid the overall overheads for implementing source/sink(split, reader, enumerator, writer, committer, committable, etc.) since we're sitting in the I've split the static inner classes into top-level classes to achieve better readability to make each file smaller. |
9e19879
to
3c2ac33
Compare
Hi, @JingsongLi thanks for your review. I've addressed your comments, please take a look when you're available, thanks :) |
} | ||
|
||
public SourceQueryOperation( | ||
ContextResolvedTable contextResolvedTable, Map<String, String> dynamicOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is better to use Nullable. scan(t)
and scan(t, dynamicOptions)
are different.
@@ -58,6 +65,7 @@ public String asSummaryString() { | |||
Map<String, Object> args = new LinkedHashMap<>(); | |||
args.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString()); | |||
args.put("fields", getResolvedSchema().getColumnNames()); | |||
args.put("options", dynamicOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can ignore null dynamicOptions
@@ -13,8 +13,8 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
|
|||
org.apache.flink.formats.testcsv.TestCsvFormatFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change order?
When removing TestManagedTableFactory
from the file, I change the order by mistake, will sort them alphabetically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert this? This looks like a pointless modification.
* Committable which contains the generated compact files to be created and the old files to be | ||
* deleted. | ||
*/ | ||
public class TestManagedCommittable implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestManagedCommittable
can not be Serializable, and I think RowData
is not Serializable
|
||
private static final int VERSION = 1; | ||
|
||
private final DataOutputSerializer out = new DataOutputSerializer(64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't reuse this DataOutputSerializer
StringData.fromString(filePath.getPath()), | ||
StringData.fromString(line))); | ||
} catch (IOException e) { | ||
// ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ignore?
|
||
@Override | ||
public boolean equals(Object o) { | ||
return super.equals(o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
equals and hashcode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
equals and hashcode?
Aha, I just copied from the previous commit. I will change it.
@Override | ||
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { | ||
throw new UnsupportedOperationException(); | ||
private static Pattern buildPartitionRegex( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use PartitionPathUtils
?
* Verify the explain result for the given ALTER TABLE COMPACT clause. | ||
* The explain result will contain the extra [[ExplainDetail]]s. | ||
*/ | ||
def verifyExplainCompact(compact: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verifyExplainSql
?
0ef6880
to
7da11c6
Compare
@@ -39,9 +41,17 @@ | |||
public class SourceQueryOperation implements QueryOperation { | |||
|
|||
private final ContextResolvedTable contextResolvedTable; | |||
private final Map<String, String> dynamicOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @nullable
@@ -72,4 +85,8 @@ public String asSummaryString() { | |||
public <T> T accept(QueryOperationVisitor<T> visitor) { | |||
return visitor.visit(this); | |||
} | |||
|
|||
public Map<String, String> getDynamicOptions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @nullable
@@ -137,12 +140,12 @@ | |||
catalogManager.getCurrentDatabase()); | |||
private final PlannerContext plannerContext = | |||
new PlannerContext( | |||
false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just modify this is OK
Can you split this to multiple commits? |
Sure |
LinkedHashMap<String, String> partitionKVs = alterTableCompact.getPartitionKVs(); | ||
CatalogPartitionSpec partitionSpec = null; | ||
Map<String, String> partitionKVs = alterTableCompact.getPartitionKVs(); | ||
CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(Collections.emptyMap()); | ||
if (partitionKVs != null) { | ||
List<String> orderedPartitionKeys = resolvedCatalogTable.getPartitionKeys(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: partitionKeys, no need to orderedPartitionKeys
import org.apache.flink.table.operations.ModifyOperation | ||
import org.apache.flink.table.operations.Operation | ||
import org.apache.flink.table.operations.QueryOperation | ||
import org.apache.flink.table.operations.SinkModifyOperation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your scala style may be something wrong... can you check for Flink Scala style?
a638e63
to
dd9c367
Compare
* Add compaction callback to ManagedTableFactory, ManagedTableListener and CatalogManager * Let SqlToOperationConverter convert the compact clause to ModifyOperation * Let SourceQueryOperation accept partition_spec as dynamic options * Let QueryOperationConverter and FlinkRelBuilder translate dynamic options as hints * Implement TestManagedTableFactory/TestManagedTableSource/TestManagedTableSink to support plan test
…MPACT" SQL * Add TestManagedFileSourceReader, TestManagedFileSourceSplitEnumerator, TestManagedIterableSourceSplit, TestManagedFileSourceSplitSerializer * Add TestManagedSinkWriter, TestManagedCommittable, TestManagedSinkCommitter, TestManagedSinkCommittableSerializer
dd9c367
to
4969111
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @LadyForest for the update. Looks good to me!
…MPACT" SQL This closes apache#18394 * Add TestManagedFileSourceReader, TestManagedFileSourceSplitEnumerator, TestManagedIterableSourceSplit, TestManagedFileSourceSplitSerializer * Add TestManagedSinkWriter, TestManagedCommittable, TestManagedSinkCommitter, TestManagedSinkCommittableSerializer
What is the purpose of the change
This PR aims to implement
ALTER TABLE table_identifier [PARTITION partition_spec] COMPACT
, which will invoke a batch job to perform file compaction for file store, the more details can be found at FLIP-188.Brief changelog
This PR contains two commits
6a33451 Implement plan generation for "ALTER TABLE ... COMPACT" SQL
ManagedTableFactory
,ManagedTableListener
andCatalogManager
SqlToOperationConverter
convert the compact clause toModifyOperation
SourceQueryOperation
accept partition_spec as dynamic optionsQueryOperationConverter
andFlinkRelBuilder
translate dynamic options as hintsTestManagedTableFactory
/TestManagedTableSource
/TestManagedTableSink
to support plan test4969111 Implement ITCase for "ALTER TABLE ... COMPACT" SQL
TestManagedFileSourceReader
,TestManagedFileSourceSplitEnumerator
,TestManagedIterableSourceSplit
,TestManagedFileSourceSplitSerializer
TestManagedSinkWriter
,TestManagedCommittable
,TestManagedSinkCommitter
,TestManagedSinkCommittableSerializer
Verifying this change
This change added tests and can be verified as follows:
SqlToOperationConverterTest#testAlterTableCompactOnManagedNonPartitionedTable
andSqlToOperationConverterTest#testAlterTableCompactOnManagedPartitionedTable
verfy converting the SQL clause to desired operation.TestManagedTableFactory#onCompactTable
as a test impl method, which injectscompact.file-base-path
andcompact.file-entries
options to the current managed table.CompactManagedTableTest
to verify plan.CompactManagedTableITCase
, which uses thedatagen
andfilesystem
connector to prepare some local files to compact, the compaction strategy is rolling all files under each partition and recreating file which named with patterncompact-${uuid}-file-0
. The test checks on non-partitioned/single-partitioned/multi-partitioned table with/without partition spec, and check two successive compaction's idempotence.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation