Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,52 @@
import org.apache.doris.common.Config;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.qe.ConnectContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Records column-level query-hit and filter-hit statistics from the Nereids physical plan.
* Called once per query in NereidsPlanner after plan translation.
*
* <p>Scope (Part 1):
* <ul>
* <li>queryHit: base SELECT columns whose ExprId flows straight through to the root
* plan's output without rewriting. Columns hidden by an alias, an expression,
* or an aggregate function are NOT recorded yet (Part 2).</li>
* <li>filterHit: columns referenced in WHERE predicate conjuncts.</li>
* <li>Only OlapTable scans are recorded; external tables (Hive, Iceberg, JDBC, …) are not.</li>
* <li>DML, EXPLAIN, and internal queries (e.g. auto-analyze) are skipped.</li>
* <li>Per query, each table's count is incremented at most once regardless of scan count.</li>
* </ul>
* GROUP BY, ORDER BY, window, JOIN, and aliased/projected columns are deferred to Part 2.
* <p>queryHit: SELECT output columns (alias-unwrapped), GROUP BY keys,
* ORDER BY keys, window PARTITION BY / ORDER BY keys, aggregate input columns.
* filterHit: WHERE predicate columns and JOIN ON conditions.
* Only OlapTable scans are recorded. DML, EXPLAIN, and internal queries are skipped.
* Per query each table's count is incremented at most once regardless of scan count.
*/
public class QueryStatsRecorder {
private static final Logger LOG = LogManager.getLogger(QueryStatsRecorder.class);
Expand Down Expand Up @@ -100,23 +113,32 @@ public static void record(PhysicalPlan plan, StatementContext stmtContext) {
*/
static Map<String, StatsDelta> collectDeltas(PhysicalPlan plan) {
Map<ExprId, PhysicalOlapScan> exprIdToScan = new HashMap<>();
Map<ExprId, String> exprIdToColName = new HashMap<>();
Map<String, StatsDelta> deltas = new HashMap<>();
walkPlan(plan, exprIdToScan, deltas);
walkPlan(plan, exprIdToScan, exprIdToColName, deltas);
if (exprIdToScan.isEmpty()) {
return deltas;
}
for (Slot slot : plan.getOutput()) {
if (!(slot instanceof SlotReference)) {
// queryHit: use getProjects() for PhysicalProject so Alias nodes are visible to unwrapAlias.
Iterable<? extends NamedExpression> rootExprs = (plan instanceof PhysicalProject)
? ((PhysicalProject<?>) plan).getProjects()
: plan.getOutput();
for (NamedExpression ne : rootExprs) {
SlotReference sr = unwrapAlias(ne);
if (sr == null) {
continue;
}
SlotReference sr = (SlotReference) slot;
PhysicalOlapScan sourceScan = exprIdToScan.get(sr.getExprId());
if (sourceScan == null) {
continue;
}
StatsDelta delta = getOrCreateDelta(deltas, sourceScan);
if (delta != null) {
sr.getOriginalColumn().ifPresent(col -> delta.addQueryStats(col.getName()));
String colName = sr.getOriginalColumn().map(col -> col.getName())
.orElseGet(() -> exprIdToColName.get(sr.getExprId()));
if (colName != null) {
delta.addQueryStats(colName);
}
}
}
return deltas;
Expand Down Expand Up @@ -149,54 +171,260 @@ static boolean shouldRecord(StatementContext ctx) {

/**
* Single-pass tree walk: registers scan output slots into exprIdToScan,
* and records filterHit for PhysicalFilter conjuncts.
* Children are visited before the current node so scans are registered
* before parent filters look them up.
* records filterHit for WHERE conjuncts, and records queryHit for
* GROUP BY / ORDER BY / window keys and aggregate input columns.
* Children are visited before the current node so scans are registered first.
* PhysicalLazyMaterializeOlapScan is checked before PhysicalOlapScan
* because it is a subclass; the inner scan's metadata must be used.
*/
private static void walkPlan(Plan plan,
Map<ExprId, PhysicalOlapScan> exprIdToScan,
Map<ExprId, String> exprIdToColName,
Map<String, StatsDelta> deltas) {
if (plan instanceof PhysicalStorageLayerAggregate) {
// COUNT(*)/MIN/MAX pushdown — the aggregate wraps the real scan but has no children.
PhysicalRelation inner = ((PhysicalStorageLayerAggregate) plan).getRelation();
if (inner instanceof PhysicalOlapScan) {
PhysicalOlapScan scan = (PhysicalOlapScan) inner;
for (Slot slot : scan.getOutput()) {
exprIdToScan.put(slot.getExprId(), scan);
if (slot instanceof SlotReference) {
registerColName(exprIdToColName, slot.getExprId(), (SlotReference) slot);
}
}
}
return;
}
if (plan instanceof PhysicalLazyMaterializeOlapScan) {
PhysicalOlapScan inner =
((PhysicalLazyMaterializeOlapScan) plan).getScan();
for (Slot slot : plan.getOutput()) {
exprIdToScan.put(slot.getExprId(), inner);
if (slot instanceof SlotReference) {
registerColName(exprIdToColName, slot.getExprId(), (SlotReference) slot);
}
}
return;
}
if (plan instanceof PhysicalOlapScan) {
PhysicalOlapScan scan = (PhysicalOlapScan) plan;
for (Slot slot : scan.getOutput()) {
exprIdToScan.put(slot.getExprId(), scan);
if (slot instanceof SlotReference) {
registerColName(exprIdToColName, slot.getExprId(), (SlotReference) slot);
}
}
return;
}
// TODO: PhysicalCTEConsumer slots use consumer-side ExprIds that differ from the producer
// scan's ExprIds, so CTE column stats are silently missed. Fix requires mapping consumer
// slots back to producer slots via StatementContext.getConsumerToProducerSlotMap().
for (Plan child : plan.children()) {
walkPlan(child, exprIdToScan, deltas);
walkPlan(child, exprIdToScan, exprIdToColName, deltas);
}
if (plan instanceof PhysicalFilter) {
PhysicalFilter<?> filter = (PhysicalFilter<?>) plan;
for (Expression conjunct : filter.getConjuncts()) {
conjunct.getInputSlots().forEach(slot -> {
if (!(slot instanceof SlotReference)) {
return;
recordInputSlotsAsFilterHit(conjunct, exprIdToScan, exprIdToColName, deltas);
}
}
// Lazy-materialized columns are not in PhysicalLazyMaterializeOlapScan.getOutput()
// (only operative slots + row-id are). The parent PhysicalLazyMaterialize exposes
// the lazy slots via getLazySlots(). Use each row-id — already registered by the
// child scan branch — to look up the source scan and register the lazy ExprIds.
if (plan instanceof PhysicalLazyMaterialize) {
PhysicalLazyMaterialize<?> lazy = (PhysicalLazyMaterialize<?>) plan;
List<Relation> rels = lazy.getRelations();
List<Slot> rowIds = lazy.getRowIds();
for (int i = 0; i < rels.size() && i < rowIds.size(); i++) {
PhysicalOlapScan sourceScan = exprIdToScan.get(rowIds.get(i).getExprId());
if (sourceScan == null) {
continue;
}
for (Slot lazySlot : lazy.getLazySlots(rels.get(i))) {
exprIdToScan.put(lazySlot.getExprId(), sourceScan);
if (lazySlot instanceof SlotReference) {
registerColName(exprIdToColName, lazySlot.getExprId(),
(SlotReference) lazySlot);
}
SlotReference sr = (SlotReference) slot;
PhysicalOlapScan sourceScan = exprIdToScan.get(sr.getExprId());
if (sourceScan == null) {
return;
}
}
}
if (plan instanceof Aggregate) {
Aggregate<?> agg = (Aggregate<?>) plan;
// GROUP BY keys
for (Expression expr : agg.getGroupByExpressions()) {
recordInputSlotsAsQueryHit(expr, exprIdToScan, exprIdToColName, deltas);
}
// Columns consumed by aggregate functions (e.g. k2 in SUM(k2))
for (NamedExpression expr : agg.getOutputExpressions()) {
recordInputSlotsAsQueryHit(expr, exprIdToScan, exprIdToColName, deltas);
}
}
if (plan instanceof AbstractPhysicalSort) {
for (OrderKey orderKey : ((AbstractPhysicalSort<?>) plan).getOrderKeys()) {
recordInputSlotsAsQueryHit(orderKey.getExpr(), exprIdToScan, exprIdToColName, deltas);
}
}
// PhysicalPartitionTopN does not extend AbstractPhysicalSort but also has ORDER BY and
// partition keys (used for row_number() / rank() per partition).
if (plan instanceof PhysicalPartitionTopN) {
PhysicalPartitionTopN<?> ptn = (PhysicalPartitionTopN<?>) plan;
for (Expression partKey : ptn.getPartitionKeys()) {
recordInputSlotsAsQueryHit(partKey, exprIdToScan, exprIdToColName, deltas);
}
for (OrderKey orderKey : ptn.getOrderKeys()) {
recordInputSlotsAsQueryHit(orderKey.getExpr(), exprIdToScan, exprIdToColName, deltas);
}
}
// PhysicalRepeat handles ROLLUP/CUBE: group sets are like GROUP BY keys.
if (plan instanceof PhysicalRepeat) {
PhysicalRepeat<?> repeat = (PhysicalRepeat<?>) plan;
for (List<Expression> groupSet : repeat.getGroupingSets()) {
for (Expression expr : groupSet) {
recordInputSlotsAsQueryHit(expr, exprIdToScan, exprIdToColName, deltas);
}
}
for (NamedExpression expr : repeat.getOutputExpressions()) {
recordInputSlotsAsQueryHit(expr, exprIdToScan, exprIdToColName, deltas);
}
}
if (plan instanceof PhysicalWindow) {
WindowFrameGroup wfg = ((PhysicalWindow<?>) plan).getWindowFrameGroup();
Set<Expression> partitionKeys = wfg.getPartitionKeys();
for (Expression expr : partitionKeys) {
recordInputSlotsAsQueryHit(expr, exprIdToScan, exprIdToColName, deltas);
}
for (OrderExpression orderExpr : wfg.getOrderKeys()) {
recordInputSlotsAsQueryHit(orderExpr.child(), exprIdToScan, exprIdToColName, deltas);
}
// queryHit for the window function value columns (e.g. k2 in SUM(k2) OVER (...)).
for (NamedExpression windowAlias : wfg.getGroups()) {
Expression windowExpr = windowAlias.child(0);
if (windowExpr instanceof WindowExpression) {
recordInputSlotsAsQueryHit(
((WindowExpression) windowExpr).getFunction(),
exprIdToScan, exprIdToColName, deltas);
}
}
}
// filterHit for JOIN ON conditions (hash equality and non-equality predicates).
if (plan instanceof AbstractPhysicalJoin) {
AbstractPhysicalJoin<?, ?> join = (AbstractPhysicalJoin<?, ?>) plan;
for (Expression conjunct : join.getHashJoinConjuncts()) {
recordInputSlotsAsFilterHit(conjunct, exprIdToScan, exprIdToColName, deltas);
}
for (Expression conjunct : join.getOtherJoinConjuncts()) {
recordInputSlotsAsFilterHit(conjunct, exprIdToScan, exprIdToColName, deltas);
}
}
// Propagate alias ExprIds for intermediate PhysicalProject nodes so that parent
// plan output slots (derived from aliases) resolve back to the original scan.
if (plan instanceof PhysicalProject) {
for (NamedExpression ne : ((PhysicalProject<?>) plan).getProjects()) {
if (exprIdToScan.containsKey(ne.getExprId())) {
continue; // plain slot pass-through — already registered by child scan
}
SlotReference underlying = unwrapAlias(ne);
if (underlying != null && !underlying.getExprId().equals(ne.getExprId())) {
// Simple alias: Alias(SlotRef) — propagate scan and column name.
PhysicalOlapScan scan = exprIdToScan.get(underlying.getExprId());
if (scan != null) {
exprIdToScan.put(ne.getExprId(), scan);
String colName = exprIdToColName.get(underlying.getExprId());
if (colName != null) {
exprIdToColName.put(ne.getExprId(), colName);
}
}
StatsDelta delta = getOrCreateDelta(deltas, sourceScan);
if (delta != null) {
sr.getOriginalColumn().ifPresent(col -> delta.addFilterStats(col.getName()));
} else if (underlying == null && ne instanceof Alias) {
// Complex alias: Alias(Cast(col), name) — created by
// PushDownExpressionsInHashCondition for type-mismatched join keys.
// If all input slots trace back to one scan, propagate to the alias ExprId.
Set<Slot> inputSlots = ((Alias) ne).child().getInputSlots();
if (inputSlots.size() == 1) {
Slot inputSlot = inputSlots.iterator().next();
PhysicalOlapScan scan = exprIdToScan.get(inputSlot.getExprId());
if (scan != null) {
exprIdToScan.put(ne.getExprId(), scan);
String colName = exprIdToColName.get(inputSlot.getExprId());
if (colName != null) {
exprIdToColName.put(ne.getExprId(), colName);
}
}
}
});
}
}
}
}

private static void recordInputSlotsAsQueryHit(Expression expr,
Map<ExprId, PhysicalOlapScan> exprIdToScan,
Map<ExprId, String> exprIdToColName,
Map<String, StatsDelta> deltas) {
for (Slot slot : expr.getInputSlots()) {
if (!(slot instanceof SlotReference)) {
continue;
}
SlotReference sr = (SlotReference) slot;
PhysicalOlapScan sourceScan = exprIdToScan.get(sr.getExprId());
if (sourceScan == null) {
continue;
}
StatsDelta delta = getOrCreateDelta(deltas, sourceScan);
if (delta != null) {
String colName = sr.getOriginalColumn().map(col -> col.getName())
.orElseGet(() -> exprIdToColName.get(sr.getExprId()));
if (colName != null) {
delta.addQueryStats(colName);
}
}
}
}

private static void recordInputSlotsAsFilterHit(Expression expr,
Map<ExprId, PhysicalOlapScan> exprIdToScan,
Map<ExprId, String> exprIdToColName,
Map<String, StatsDelta> deltas) {
for (Slot slot : expr.getInputSlots()) {
if (!(slot instanceof SlotReference)) {
continue;
}
SlotReference sr = (SlotReference) slot;
PhysicalOlapScan sourceScan = exprIdToScan.get(sr.getExprId());
if (sourceScan == null) {
continue;
}
StatsDelta delta = getOrCreateDelta(deltas, sourceScan);
if (delta != null) {
String colName = sr.getOriginalColumn().map(col -> col.getName())
.orElseGet(() -> exprIdToColName.get(sr.getExprId()));
if (colName != null) {
delta.addFilterStats(colName);
}
}
}
}

/** Registers a slot's column name into exprIdToColName for later fallback lookup. */
private static void registerColName(Map<ExprId, String> exprIdToColName,
ExprId exprId, SlotReference slot) {
String name = slot.getOriginalColumn().map(col -> col.getName()).orElse(slot.getName());
if (name != null) {
exprIdToColName.put(exprId, name);
}
}

/** Unwraps Alias chains to reach the underlying SlotReference; null for computed expressions. */
static SlotReference unwrapAlias(Expression expr) {
if (expr instanceof SlotReference) {
return (SlotReference) expr;
}
if (expr instanceof Alias) {
return unwrapAlias(((Alias) expr).child());
}
return null;
}

private static StatsDelta getOrCreateDelta(Map<String, StatsDelta> deltas,
PhysicalOlapScan scan) {
OlapTable t = scan.getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ public static long getMergedReplicaStats(long replicaId) {
request.setType(TQueryStatsType.TABLET);
request.setReplicaId(replicaId);
for (TQueryStatsResult other : getStats(request)) {
queryHits += other.getTabletStats().get(replicaId);
Long remoteCount = other.getTabletStats().get(replicaId);
if (remoteCount != null) {
queryHits += remoteCount;
}
}
return queryHits;
}
Expand Down
Loading
Loading