Skip to content
6 changes: 5 additions & 1 deletion docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ All available procedures are listed below.
<li>order_strategy(optional): 'order' or 'zorder' or 'hilbert' or 'none'.</li>
<li>order_by(optional): the columns need to be sort. Left empty if 'order_strategy' is 'none'.</li>
<li>options(optional): additional dynamic options of the table.</li>
<li>where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.</li>
</td>
<td>
CALL sys.compact(`table` => 'default.T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
-- use partition filter <br/>
CALL sys.compact(`table` => 'default.T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4') <br/>
-- use partition predicate <br/>
CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20', order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E

public String[] call(ProcedureContext procedureContext, String tableId, String partitions)
throws Exception {
return call(procedureContext, tableId, partitions, "", "", "");
return call(procedureContext, tableId, partitions, "", "", "", "");
Comment thread
wg1026688210 marked this conversation as resolved.
}

public String[] call(
Expand All @@ -66,7 +66,7 @@ public String[] call(
String orderStrategy,
String orderByColumns)
throws Exception {
return call(procedureContext, tableId, partitions, orderStrategy, orderByColumns, "");
return call(procedureContext, tableId, partitions, orderStrategy, orderByColumns, "", "");
}

public String[] call(
Expand All @@ -77,6 +77,26 @@ public String[] call(
String orderByColumns,
String tableOptions)
throws Exception {
return call(
procedureContext,
tableId,
partitions,
orderStrategy,
orderByColumns,
tableOptions,
"");
}

public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String orderStrategy,
String orderByColumns,
String tableOptions,
String whereSql)
throws Exception {

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
Expand Down Expand Up @@ -115,6 +135,10 @@ public String[] call(
action.withPartitions(ParameterUtils.getPartitions(partitions.split(";")));
}

if (!StringUtils.isBlank(whereSql)) {
action.withWhereSql(whereSql);
}

return execute(procedureContext, action, jobName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,10 @@ public void testCallCompact() {
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1')"))
.doesNotThrowAnyException();
assertThatCode(
() ->
sql(
"CALL sys.compact('default.T', '', '', '', 'sink.parallelism=1','pt=1')"))
.doesNotThrowAnyException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,40 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;

/** Table compact action for Flink. */
public class CompactAction extends TableActionBase {

private static final Logger LOGGER = LoggerFactory.getLogger(CompactAction.class);

private List<Map<String, String>> partitions;

private String whereSql;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
}
Expand Down Expand Up @@ -72,8 +85,13 @@ public CompactAction withPartitions(List<Map<String, String>> partitions) {
return this;
}

public CompactAction withWhereSql(String whereSql) {
this.whereSql = whereSql;
return this;
}

@Override
public void build() {
public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand All @@ -94,34 +112,69 @@ public void build() {
}

private void buildForTraditionalCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);

sourceBuilder.withPartitions(partitions);
sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
sinkBuilder.withInput(source).build();
}

private void buildForUnawareBucketCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
new UnawareBucketCompactionTopoBuilder(env, identifier.getFullName(), table);

unawareBucketCompactionTopoBuilder.withPartitions(partitions);
unawareBucketCompactionTopoBuilder.withPartitionPredicate(getPredicate());
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
unawareBucketCompactionTopoBuilder.build();
}

protected Predicate getPredicate() throws Exception {
Preconditions.checkArgument(
partitions == null || whereSql == null,
"partitions and where cannot be used together.");
Predicate predicate = null;
if (partitions != null) {
predicate =
PredicateBuilder.or(
partitions.stream()
.map(
p ->
createPartitionPredicate(
p,
table.rowType(),
((FileStoreTable) table)
.coreOptions()
.partitionDefaultName()))
.toArray(Predicate[]::new));
} else if (whereSql != null) {
SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
new SimpleSqlPredicateConvertor(table.rowType());
predicate = simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
}

// Check whether predicate contain non parition key.
if (predicate != null) {
LOGGER.info("the partition predicate of compaction is {}", predicate);
PartitionPredicateVisitor partitionPredicateVisitor =
new PartitionPredicateVisitor(table.partitionKeys());
Preconditions.checkArgument(
predicate.visit(partitionPredicateVisitor),
"Only parition key can be specialized in compaction action.");
}

return predicate;
}

@Override
public void run() throws Exception {
build();
execute("Compact job");
}

public List<Map<String, String>> getPartitions() {
return partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class CompactActionFactory implements ActionFactory {
private static final String ORDER_STRATEGY = "order_strategy";
private static final String ORDER_BY = "order_by";

private static final String WHERE = "where";

@Override
public String identifier() {
return IDENTIFIER;
Expand Down Expand Up @@ -67,6 +69,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
if (params.has(PARTITION)) {
List<Map<String, String>> partitions = getPartitions(params);
action.withPartitions(partitions);
} else if (params.has(WHERE)) {
action.withWhereSql(params.get(WHERE));
}

return Optional.of(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

Expand All @@ -44,8 +42,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;

/** Compact with sort action. */
public class SortCompactAction extends CompactAction {

Expand All @@ -72,7 +68,7 @@ public void run() throws Exception {
}

@Override
public void build() {
public void build() throws Exception {
// only support batch sort yet
if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
!= RuntimeExecutionMode.BATCH) {
Expand All @@ -96,21 +92,7 @@ public void build() {
identifier.getObjectName())
.asSummaryString());

if (getPartitions() != null) {
Predicate partitionPredicate =
PredicateBuilder.or(
getPartitions().stream()
.map(
p ->
createPartitionPredicate(
p,
table.rowType(),
((FileStoreTable) table)
.coreOptions()
.partitionDefaultName()))
.toArray(Predicate[]::new));
sourceBuilder.predicate(partitionPredicate);
}
sourceBuilder.predicate(getPredicate());

String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -34,11 +35,6 @@

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;

/**
* Build for unaware-bucket table flink compaction job.
*
Expand All @@ -53,9 +49,11 @@ public class UnawareBucketCompactionTopoBuilder {
private final transient StreamExecutionEnvironment env;
private final String tableIdentifier;
private final FileStoreTable table;
@Nullable private List<Map<String, String>> specifiedPartitions = null;

private boolean isContinuous = false;

@Nullable private Predicate partitionPredicate;

public UnawareBucketCompactionTopoBuilder(
StreamExecutionEnvironment env, String tableIdentifier, FileStoreTable table) {
this.env = env;
Expand All @@ -67,8 +65,8 @@ public void withContinuousMode(boolean isContinuous) {
this.isContinuous = isContinuous;
}

public void withPartitions(List<Map<String, String>> partitions) {
this.specifiedPartitions = partitions;
public void withPartitionPredicate(Predicate predicate) {
this.partitionPredicate = predicate;
}

public void build() {
Expand All @@ -93,15 +91,7 @@ private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table,
isContinuous,
scanInterval,
specifiedPartitions != null
? createPartitionPredicate(
specifiedPartitions,
table.rowType(),
table.coreOptions().partitionDefaultName())
: null);
table, isContinuous, scanInterval, partitionPredicate);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Loading