Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]Truncate lake table #8886

Merged
merged 9 commits into from Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/LakeTable.java
Expand Up @@ -7,20 +7,25 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DistributionInfo;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.TableIndexes;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.DdlException;
import com.starrocks.common.io.DeepCopy;
import com.starrocks.common.io.Text;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

Expand All @@ -34,6 +39,8 @@
*/
public class LakeTable extends OlapTable {

private static final Logger LOG = LogManager.getLogger(LakeTable.class);

public LakeTable(long id, String tableName, List<Column> baseSchema, KeysType keysType, PartitionInfo partitionInfo,
DistributionInfo defaultDistributionInfo, TableIndexes indexes) {
super(id, tableName, baseSchema, keysType, partitionInfo, defaultDistributionInfo,
Expand Down Expand Up @@ -82,6 +89,17 @@ public void setStorageInfo(ShardStorageInfo shardStorageInfo, boolean enableCach
.setStorageInfo(new StorageInfo(newShardStorageInfo, new StorageCacheInfo(enableCache, cacheTtlS)));
}

@Override
public OlapTable selectiveCopy(Collection<String> reservedPartitions, boolean resetState,
MaterializedIndex.IndexExtState extState) {
LakeTable copied = DeepCopy.copyWithGson(this, LakeTable.class);
if (copied == null) {
LOG.warn("failed to copy lake table: {}", getName());
return null;
}
return selectiveCopyInternal(copied, reservedPartitions, resetState, extState);
}

@Override
public void write(DataOutput out) throws IOException {
// write type first
Expand Down
17 changes: 12 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Expand Up @@ -3785,8 +3785,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, dbTbl.getTbl());
}

if (table.getType() != Table.TableType.OLAP) {
throw new DdlException("Only support truncate OLAP table");
if (!table.isOlapOrLakeTable()) {
throw new DdlException("Only support truncate OLAP table or LAKE table");
}

OlapTable olapTable = (OlapTable) table;
Expand Down Expand Up @@ -3892,7 +3892,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
}

// replace
truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
truncateTableInternal(olapTable, newPartitions, truncateEntireTable, false);

// write edit log
TruncateTableInfo info = new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions,
Expand All @@ -3906,7 +3906,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
tblRef.getName().toSql(), tblRef.getPartitionNames());
}

private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions,
boolean isEntireTable, boolean isReplay) {
// use new partitions to replace the old ones.
Set<Long> oldTabletIds = Sets.newHashSet();
for (Partition newPartition : newPartitions) {
Expand All @@ -3928,14 +3929,20 @@ private void truncateTableInternal(OlapTable olapTable, List<Partition> newParti
for (Long tabletId : oldTabletIds) {
GlobalStateMgr.getCurrentInvertedIndex().deleteTablet(tabletId);
}

// if it is lake table, need to delete shard and drop tablet
if (olapTable.isLakeTable() && !isReplay) {
stateMgr.getShardManager().getShardDeleter().addUnusedShardId(oldTabletIds);
editLog.logAddUnusedShard(oldTabletIds);
}
}

public void replayTruncateTable(TruncateTableInfo info) {
Database db = getDb(info.getDbId());
db.writeLock();
try {
OlapTable olapTable = (OlapTable) db.getTable(info.getTblId());
truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable(), true);

if (!GlobalStateMgr.isCheckpointThread()) {
// add tablet to inverted index
Expand Down