Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f7e8559
[feat](ivm) Add mtmv increment refresh framework
yujun777 Mar 17, 2026
b021cbf
[feature](mtmv) add nereids ivm rewrite gate and tests
yujun777 Mar 17, 2026
110baae
[feature](ivm) add refresh manager entry point
yujun777 Mar 17, 2026
cac05b1
[ivm] make plan analysis an analysis artifact
yujun777 Mar 17, 2026
992fdc9
Add thin IVM stream metadata precheck
yujun777 Mar 17, 2026
cded64b
[ivm] fix IVMRefreshResult null contract, add toString and fallback l…
yujun777 Mar 17, 2026
43f6ba9
[ivm] replace DeltaPlanBundle description string with LogicalPlan and…
yujun777 Mar 18, 2026
e4cae57
[ivm] replace ivmPlanAnalysis with ivmDeltaBundles in CascadesContext
yujun777 Mar 18, 2026
31a50a2
[ivm] replace plan analyzer/dispatcher with Nereids delta rules
yujun777 Mar 18, 2026
fc61488
[ivm] refactor IVM Nereids rules: normalize + delta rule skeletons
yujun777 Mar 18, 2026
a260e59
[ivm] introduce IvmAnalyzeMode and split IVM session variables
yujun777 Mar 18, 2026
301e528
[ivm] implement IvmNormalizeMtmvPlan row-id injection and IvmContext
yujun777 Mar 18, 2026
be8f7ec
[ivm] wire row-id column into CreateMTMVInfo and add UTs
yujun777 Mar 18, 2026
5a40bb4
[ivm] make IVM MV UNIQUE_KEYS+MOW and add key-type UTs
yujun777 Mar 18, 2026
7549ead
Rename IVM delta bundle to command bundle
yujun777 Mar 18, 2026
a314c59
[ivm] remove mv_ prefix from IVM row-id column name
yujun777 Mar 18, 2026
ad2db09
[ivm] move delta calculation outside Nereids and add skeleton IvmDelt…
yujun777 Mar 18, 2026
b6d10b9
[ivm] implement scan & project-scan delta rewriting in IvmDeltaRewriter
yujun777 Mar 18, 2026
ff75070
[ivm] convert IVMDeltaExecutor and IVMCapabilityChecker from interfac…
yujun777 Mar 18, 2026
5d6e6bf
[ivm] integrate IVM refresh into MTMVTask run flow
yujun777 Mar 18, 2026
993d29c
[ivm] extract MTMVPlanUtil.executeCommand() and add audit log to IVMD…
yujun777 Mar 19, 2026
c68e852
fix ivm refresh insert table command fail
yujun777 Mar 19, 2026
5b0f7f2
[improvement](fe) Centralize IVM hidden column names
yujun777 Mar 19, 2026
bf400a3
[fix](fe) Remove unused MTMV imports
yujun777 Mar 19, 2026
aa71794
[fix](fe) Preserve IVM row id in incremental mtmv refresh
yujun777 Mar 22, 2026
7f78f77
[fix](fe) Refresh root fragment output exprs
yujun777 Mar 22, 2026
575af34
[fix](fe) Disable DML MV rewrite during MTMV refresh
yujun777 Mar 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ refreshSchedule
;

refreshMethod
: COMPLETE | AUTO
: COMPLETE | AUTO | INCREMENTAL
;

mvPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class Column implements GsonPostProcessable {
public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
public static final String ROWID_COL = "__DORIS_ROWID_COL__";
public static final String GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
public static final String IVM_ROW_ID_COL = "__DORIS_IVM_ROW_ID_COL__";
public static final String ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
public static final String VERSION_COL = "__DORIS_VERSION_COL__";
public static final String SKIP_BITMAP_COL = "__DORIS_SKIP_BITMAP_COL__";
Expand Down
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.ivm.IVMInfo;
import org.apache.doris.nereids.rules.analysis.SessionVarGuardRewriter;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
Expand All @@ -56,6 +58,7 @@
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -87,6 +90,8 @@ public class MTMV extends OlapTable {
private MTMVPartitionInfo mvPartitionInfo;
@SerializedName("rs")
private MTMVRefreshSnapshot refreshSnapshot;
@SerializedName("ii")
private IVMInfo ivmInfo;
// Should update after every fresh, not persist
// Cache with SessionVarGuardExpr: used when query session variables differ from MV creation variables
private MTMVCache cacheWithGuard;
Expand Down Expand Up @@ -120,6 +125,7 @@ public MTMV() {
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
this.refreshSnapshot = new MTMVRefreshSnapshot();
this.ivmInfo = new IVMInfo();
this.envInfo = new EnvInfo(-1L, -1L);
this.sessionVariables = params.sessionVariables;
mvRwLock = new ReentrantReadWriteLock(true);
Expand Down Expand Up @@ -437,6 +443,21 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
return refreshSnapshot;
}

public IVMInfo getIvmInfo() {
return ivmInfo;
}

public List<String> getInsertedColumnNames() {
List<Column> columns = getBaseSchema(true);
List<String> columnNames = Lists.newArrayListWithExpectedSize(columns.size());
for (Column column : columns) {
if (column.isVisible() || column.getName().startsWith("__DORIS_IVM_")) {
columnNames.add(column.getName());
}
}
return columnNames;
}

public long getSchemaChangeVersion() {
readMvLock();
try {
Expand Down Expand Up @@ -609,6 +630,9 @@ private void compatibleInternal(CatalogMgr catalogMgr) throws Exception {
@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
if (ivmInfo == null) {
ivmInfo = new IVMInfo();
}
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots = refreshSnapshot.getPartitionSnapshots();
compatiblePctSnapshot(partitionSnapshots);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,16 @@
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mtmv.ivm.IVMRefreshManager;
import org.apache.doris.mtmv.ivm.IVMRefreshResult;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -244,6 +242,21 @@ public void run() throws JobException {
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
}
// Attempt IVM refresh for incremental MVs and fall back when the plan is unsupported.
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.INCREMENTAL) {
IVMRefreshManager ivmRefreshManager = new IVMRefreshManager();
IVMRefreshResult ivmResult = ivmRefreshManager.doRefresh(mtmv);
if (ivmResult.isSuccess()) {
LOG.info("IVM incremental refresh succeeded for mv={}, taskId={}",
mtmv.getName(), getTaskId());
return;
}
LOG.warn("IVM refresh fell back for mv={}, reason={}, detail={}, taskId={}. "
+ "Continuing with partition-based refresh.",
mtmv.getName(), ivmResult.getFallbackReason(),
ivmResult.getDetailMessage(), getTaskId());
// TODO: it may cause too many full refresh, need limit full refresh here
}
Map<TableIf, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newCopyOnWriteArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
Expand Down Expand Up @@ -321,36 +334,20 @@ private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, Strin
private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
StatementContext statementContext = new StatementContext();
for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
statementContext.setSnapshot(entry.getKey(), entry.getValue());
}
ctx.setStatementContext(statementContext);
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE
? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey, statementContext);
try {
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (getStatus() == TaskStatus.CANCELED) {
// Throwing an exception to interrupt subsequent partition update tasks
throw new JobException("task is CANCELED");
}
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
} finally {
if (executor != null) {
AuditLogHelper.logAuditLog(ctx, getDummyStmt(refreshPartitionNames),
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true);
}
boolean enableIvmNormalMTMVPlan = mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.INCREMENTAL;
executor = MTMVPlanUtil.executeCommand(mtmv, command, statementContext,
getDummyStmt(refreshPartitionNames), enableIvmNormalMTMVPlan);
lastQueryId = DebugUtil.printId(executor.getContext().queryId());
if (getStatus() == TaskStatus.CANCELED) {
throw new JobException("task is CANCELED");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.mtmv;

import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;

import java.util.List;
Expand All @@ -25,6 +26,8 @@ public class MTMVAnalyzeQueryInfo {
private MTMVRelation relation;
private MTMVPartitionInfo mvPartitionInfo;
private List<ColumnDefinition> columnDefinitions;
// set when IVM normalization is enabled
private Plan ivmNormalizedPlan;

public MTMVAnalyzeQueryInfo(List<ColumnDefinition> columnDefinitions, MTMVPartitionInfo mvPartitionInfo,
MTMVRelation relation) {
Expand All @@ -44,4 +47,12 @@ public MTMVPartitionInfo getMvPartitionInfo() {
public MTMVRelation getRelation() {
return relation;
}

public Plan getIvmNormalizedPlan() {
return ivmNormalizedPlan;
}

public void setIvmNormalizedPlan(Plan ivmNormalizedPlan) {
this.ivmNormalizedPlan = ivmNormalizedPlan;
}
}
Loading
Loading