Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Materialized views support refresh granularity splits #12926

Merged
merged 11 commits into from Nov 8, 2022
Expand Up @@ -418,9 +418,10 @@ public Set<String> getUpdatedPartitionNamesOfTable(Table base) {
.getBaseTableVisibleVersionMap()
.computeIfAbsent(baseTable.getId(), k -> Maps.newHashMap());
Set<String> result = Sets.newHashSet();
// check whether there are partitions added
// check whether there are partitions added and have data
for (String partitionName : baseTable.getPartitionNames()) {
if (!baseTableVisibleVersionMap.containsKey(partitionName)) {
if (!baseTableVisibleVersionMap.containsKey(partitionName)
&& baseTable.getPartition(partitionName).getVisibleVersion() != 1) {
result.add(partitionName);
}
}
Expand Down Expand Up @@ -632,6 +633,13 @@ public String getMaterializedViewDdlStmt(boolean simple) {
sb.append(properties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TTL_NUMBER)).append("\"");
}

// partition refresh number
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER)) {
sb.append(StatsConstants.TABLE_PROPERTY_SEPARATOR).append(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER)
.append("\" = \"");
sb.append(properties.get(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER)).append("\"");
}

sb.append("\n)");
String define = this.getSimpleDefineSql();
if (StringUtils.isEmpty(define) || !simple) {
Expand Down
Expand Up @@ -50,7 +50,7 @@
*/
public class TableProperty implements Writable, GsonPostProcessable {
public static final String DYNAMIC_PARTITION_PROPERTY_PREFIX = "dynamic_partition";
public static final int NO_TTL = -1;
public static final int INVALID = -1;

@SerializedName(value = "properties")
private Map<String, String> properties;
Expand All @@ -60,7 +60,9 @@ public class TableProperty implements Writable, GsonPostProcessable {
private Short replicationNum = FeConstants.default_replication_num;

// partition time to live number, -1 means no ttl
private int partitionTTLNumber = NO_TTL;
private int partitionTTLNumber = INVALID;

private int partitionRefreshNumber = INVALID;

private boolean isInMemory = false;

Expand Down Expand Up @@ -154,7 +156,13 @@ public TableProperty buildReplicationNum() {

public TableProperty buildPartitionTTL() {
partitionTTLNumber = Integer.parseInt(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_PARTITION_TTL_NUMBER,
String.valueOf(NO_TTL)));
String.valueOf(INVALID)));
return this;
}

public TableProperty buildPartitionRefreshNumber() {
partitionRefreshNumber = Integer.parseInt(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER,
String.valueOf(INVALID)));
return this;
}

Expand Down Expand Up @@ -222,6 +230,14 @@ public int getPartitionTTLNumber() {
return partitionTTLNumber;
}

public int getPartitionRefreshNumber() {
return partitionRefreshNumber;
}

public void setPartitionRefreshNumber(int partitionRefreshNumber) {
this.partitionRefreshNumber = partitionRefreshNumber;
}

public boolean isInMemory() {
return isInMemory;
}
Expand Down Expand Up @@ -289,6 +305,7 @@ public void gsonPostProcess() throws IOException {
buildCompressionType();
buildWriteQuorum();
buildPartitionTTL();
buildPartitionRefreshNumber();
buildReplicatedStorage();
}
}
Expand Up @@ -64,7 +64,7 @@
import java.util.Objects;
import java.util.Set;

import static com.starrocks.catalog.TableProperty.NO_TTL;
import static com.starrocks.catalog.TableProperty.INVALID;

/**
* This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties
Expand Down Expand Up @@ -433,7 +433,7 @@ private void executePartitionTimeToLive() {
}

int ttlNumber = olapTable.getTableProperty().getPartitionTTLNumber();
if (Objects.equals(ttlNumber, NO_TTL)) {
if (Objects.equals(ttlNumber, INVALID)) {
iterator.remove();
LOG.warn("database={}, table={} have no ttl. remove it from scheduler", dbId, tableId);
continue;
Expand Down
Expand Up @@ -106,9 +106,10 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_ENABLE_STORAGE_CACHE = "enable_storage_cache";
public static final String PROPERTIES_STORAGE_CACHE_TTL = "storage_cache_ttl";
public static final String PROPERTIES_ALLOW_ASYNC_WRITE_BACK = "allow_async_write_back";

public static final String PROPERTIES_PARTITION_TTL_NUMBER = "partition_ttl_number";

public static final String PROPERTIES_PARTITION_REFRESH_NUMBER = "partition_refresh_number";

public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
throws AnalysisException {
if (properties == null) {
Expand Down Expand Up @@ -210,6 +211,22 @@ public static int analyzePartitionTimeToLive(Map<String, String> properties) thr
return partitionTimeToLive;
}

public static int analyzePartitionRefreshNumber(Map<String, String> properties) throws AnalysisException {
int partitionRefreshNumber = -1;
if (properties != null && properties.containsKey(PROPERTIES_PARTITION_REFRESH_NUMBER)) {
try {
partitionRefreshNumber = Integer.parseInt(properties.get(PROPERTIES_PARTITION_REFRESH_NUMBER));
} catch (NumberFormatException e) {
throw new AnalysisException("Partition Refresh Number: " + e.getMessage());
}
if (partitionRefreshNumber <= 0) {
throw new AnalysisException("Partition Refresh Number should larger than 0.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to unset refresh number?
what if the user do not specify the refresh number, what is the default logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature will do in Support alter materialized view properties or later PR

}
properties.remove(PROPERTIES_PARTITION_REFRESH_NUMBER);
}
return partitionRefreshNumber;
}

public static Short analyzeReplicationNum(Map<String, String> properties, short oldReplicationNum)
throws AnalysisException {
short replicationNum = oldReplicationNum;
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Expand Up @@ -1273,7 +1273,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
boolean insertError = false;
try {
if (execPlan.getFragments().get(0).getSink() instanceof OlapTableSink) {
//if sink is OlapTableSink Assigned to Be execute this sql [cn execute OlapTableSink will crash]
// if sink is OlapTableSink Assigned to Be execute this sql [cn execute OlapTableSink will crash]
context.getSessionVariable().setPreferComputeNode(false);
context.getSessionVariable().setUseComputeNodes(0);
OlapTableSink dataSink = (OlapTableSink) execPlan.getFragments().get(0).getSink();
Expand All @@ -1289,9 +1289,9 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
List<ScanNode> scanNodes = execPlan.getScanNodes();

boolean containOlapScanNode = false;
for (int i = 0; i < scanNodes.size(); i++) {
if (scanNodes.get(i) instanceof OlapScanNode) {
estimateScanRows += ((OlapScanNode) scanNodes.get(i)).getActualRows();
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
estimateScanRows += ((OlapScanNode) scanNode).getActualRows();
containOlapScanNode = true;
}
}
Expand Down
Expand Up @@ -2,14 +2,13 @@

package com.starrocks.scheduler;

import java.util.HashMap;
import java.util.Map;

public class ExecuteOption {

private int priority = Constants.TaskRunPriority.LOWEST.value();
private boolean mergeRedundant = false;
private HashMap<String, String> taskRunProperties;
private Map<String, String> taskRunProperties;

public ExecuteOption() {
}
Expand All @@ -18,7 +17,7 @@ public ExecuteOption(int priority) {
this.priority = priority;
}

public ExecuteOption(int priority, boolean mergeRedundant, HashMap<String, String> taskRunProperties) {
public ExecuteOption(int priority, boolean mergeRedundant, Map<String, String> taskRunProperties) {
this.priority = priority;
this.mergeRedundant = mergeRedundant;
this.taskRunProperties = taskRunProperties;
Expand Down
Expand Up @@ -9,6 +9,9 @@ public class MvTaskRunContext extends TaskRunContext {
Map<String, Set<String>> baseToMvNameRef;
Map<String, Set<String>> mvToBaseNameRef;

String nextPartitionStart = null;
String nextPartitionEnd = null;

public MvTaskRunContext(TaskRunContext context) {
this.ctx = context.ctx;
this.definition = context.definition;
Expand All @@ -31,4 +34,24 @@ public Map<String, Set<String>> getMvToBaseNameRef() {
public void setMvToBaseNameRef(Map<String, Set<String>> mvToBaseNameRef) {
this.mvToBaseNameRef = mvToBaseNameRef;
}

public boolean hasNextBatchPartition() {
return nextPartitionStart != null && nextPartitionEnd != null;
}

public String getNextPartitionStart() {
return nextPartitionStart;
}

public void setNextPartitionStart(String nextPartitionStart) {
this.nextPartitionStart = nextPartitionStart;
}

public String getNextPartitionEnd() {
return nextPartitionEnd;
}

public void setNextPartitionEnd(String nextPartitionEnd) {
this.nextPartitionEnd = nextPartitionEnd;
}
}
Expand Up @@ -8,8 +8,11 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.catalog.Column;
Expand All @@ -29,6 +32,8 @@
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.io.DeepCopy;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.RangeUtils;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.persist.ChangeMaterializedViewRefreshSchemeLog;
Expand Down Expand Up @@ -64,10 +69,12 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Core logic of materialized view refresh task run
Expand All @@ -87,6 +94,15 @@ public class PartitionBasedMaterializedViewRefreshProcessor extends BaseTaskRunP
// table id -> <base table info, snapshot table>
private Map<Long, Pair<MaterializedView.BaseTableInfo, Table>> snapshotBaseTables;

@VisibleForTesting
public MvTaskRunContext getMvContext() {
return mvContext;
}
@VisibleForTesting
public void setMvContext(MvTaskRunContext mvContext) {
this.mvContext = mvContext;
}

// Core logics:
// 1. prepare to check some conditions
// 2. sync partitions with base tables(add or drop partitions, which will be optimized by dynamic partition creation later)
Expand Down Expand Up @@ -129,12 +145,14 @@ public void processTaskRun(TaskRunContext context) throws Exception {
// refresh external table meta cache
refreshExternalTable(context);
Set<String> partitionsToRefresh = getPartitionsToRefreshForMaterializedView(context.getProperties());
LOG.debug("materialized view partitions to refresh:{}", partitionsToRefresh);
if (partitionsToRefresh.isEmpty()) {
LOG.info("no partitions to refresh for materialized view {}", materializedView.getName());
return;
}
// Only refresh the first partition refresh number partitions, other partitions will generate new tasks
filterPartitionByRefreshNumber(partitionsToRefresh, materializedView);

LOG.debug("materialized view partitions to refresh:{}", partitionsToRefresh);
Map<String, Set<String>> sourceTablePartitions = getSourceTablePartitions(partitionsToRefresh);
LOG.debug("materialized view:{} source partitions :{}",
materializedView.getName(), sourceTablePartitions);
Expand All @@ -152,6 +170,86 @@ public void processTaskRun(TaskRunContext context) throws Exception {

// insert execute successfully, update the meta of materialized view according to ExecPlan
updateMeta(execPlan);

if (mvContext.hasNextBatchPartition()) {
generateNextTaskRun();
}
}

@VisibleForTesting
public void filterPartitionByRefreshNumber(Set<String> partitionsToRefresh, MaterializedView materializedView) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you test if one source base table partition has two target mv partitions can run successfully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, this configuration has nothing to do with the source table, only the number of partitions of the materialized view table.

int partitionRefreshNumber = materializedView.getTableProperty().getPartitionRefreshNumber();
if (partitionRefreshNumber <= 0) {
return;
}
Map<String, Range<PartitionKey>> rangePartitionMap = materializedView.getRangePartitionMap();
if (partitionRefreshNumber >= rangePartitionMap.size()) {
return;
}
Map<String, Range<PartitionKey>> mappedPartitionsToRefresh = Maps.newHashMap();
for (String partitionName : partitionsToRefresh) {
mappedPartitionsToRefresh.put(partitionName, rangePartitionMap.get(partitionName));
}
LinkedHashMap<String, Range<PartitionKey>> sortedPartition = mappedPartitionsToRefresh.entrySet().stream()
.sorted(Map.Entry.comparingByValue(RangeUtils.RANGE_COMPARATOR))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));

Iterator<String> partitionNameIter = sortedPartition.keySet().iterator();
for (int i = 0; i < partitionRefreshNumber; i++) {
if (partitionNameIter.hasNext()) {
partitionNameIter.next();
}
}
String nextPartitionStart = null;
String endPartitionName = null;
if (partitionNameIter.hasNext()) {
String startPartitionName = partitionNameIter.next();
Range<PartitionKey> partitionKeyRange = mappedPartitionsToRefresh.get(startPartitionName);
LiteralExpr lowerExpr = partitionKeyRange.lowerEndpoint().getKeys().get(0);
nextPartitionStart = parseLiteralExprToDateString(lowerExpr, 0);
endPartitionName = startPartitionName;
partitionsToRefresh.remove(endPartitionName);
}
while (partitionNameIter.hasNext()) {
endPartitionName = partitionNameIter.next();
partitionsToRefresh.remove(endPartitionName);
}

mvContext.setNextPartitionStart(nextPartitionStart);
LiteralExpr upperExpr = mappedPartitionsToRefresh.get(endPartitionName).upperEndpoint().getKeys().get(0);
mvContext.setNextPartitionEnd(parseLiteralExprToDateString(upperExpr, 1));
}

private String parseLiteralExprToDateString(LiteralExpr expr, int offset) {
if (expr instanceof DateLiteral) {
DateLiteral lowerDate = (DateLiteral) expr;
return DateUtils.DATE_FORMATTER.format(lowerDate.toLocalDateTime().plusDays(offset));
} else if (expr instanceof IntLiteral) {
IntLiteral intLiteral = (IntLiteral) expr;
return String.valueOf(intLiteral.getLongValue() + offset);
} else {
return null;
}
}

private void generateNextTaskRun() {
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
Map<String, String> properties = mvContext.getProperties();
long mvId = Long.parseLong(properties.get(MV_ID));
String taskName = TaskBuilder.getMvTaskName(mvId);
Map<String, String> newProperties = Maps.newHashMap();
for (Map.Entry<String, String> proEntry : properties.entrySet()) {
if (proEntry.getValue() != null) {
newProperties.put(proEntry.getKey(), proEntry.getValue());
}
}
newProperties.put(TaskRun.PARTITION_START, mvContext.getNextPartitionStart());
newProperties.put(TaskRun.PARTITION_END, mvContext.getNextPartitionEnd());
ExecuteOption option = new ExecuteOption(Constants.TaskRunPriority.LOWEST.value(),
false, newProperties);
taskManager.executeTask(taskName, option);
LOG.info("Submit a generate taskRun for task:{}, partitionStart:{}, partitionEnd:{}", mvId,
mvContext.getNextPartitionStart(), mvContext.getNextPartitionEnd());
}

private void refreshExternalTable(TaskRunContext context) {
Expand Down Expand Up @@ -588,7 +686,7 @@ private boolean checkBaseTablePartitionChange() {
}
}
} catch (UserException e) {
LOG.warn("Materialized view compute partition change failed : {}", e);
LOG.warn("Materialized view compute partition change failed", e);
return true;
} finally {
db.readUnlock();
Expand Down
Expand Up @@ -3316,6 +3316,12 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)
.put(PropertyAnalyzer.PROPERTIES_PARTITION_TTL_NUMBER, String.valueOf(number));
materializedView.getTableProperty().setPartitionTTLNumber(number);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER)) {
int number = PropertyAnalyzer.analyzePartitionRefreshNumber(properties);
materializedView.getTableProperty().getProperties()
.put(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER, String.valueOf(number));
materializedView.getTableProperty().setPartitionRefreshNumber(number);
}
if (!properties.isEmpty()) {
// here, all properties should be checked
throw new DdlException("Unknown properties: " + properties);
Expand Down