Skip to content

Commit

Permalink
[Feature] Support truncate lake table (#8886)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc982627271 committed Jul 20, 2022
1 parent 7907202 commit 72eee1b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/LakeTable.java
Expand Up @@ -7,20 +7,25 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DistributionInfo;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.TableIndexes;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.DdlException;
import com.starrocks.common.io.DeepCopy;
import com.starrocks.common.io.Text;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

Expand All @@ -34,6 +39,8 @@
*/
public class LakeTable extends OlapTable {

private static final Logger LOG = LogManager.getLogger(LakeTable.class);

public LakeTable(long id, String tableName, List<Column> baseSchema, KeysType keysType, PartitionInfo partitionInfo,
DistributionInfo defaultDistributionInfo, TableIndexes indexes) {
super(id, tableName, baseSchema, keysType, partitionInfo, defaultDistributionInfo,
Expand Down Expand Up @@ -82,6 +89,17 @@ public void setStorageInfo(ShardStorageInfo shardStorageInfo, boolean enableCach
.setStorageInfo(new StorageInfo(newShardStorageInfo, new StorageCacheInfo(enableCache, cacheTtlS)));
}

@Override
public OlapTable selectiveCopy(Collection<String> reservedPartitions, boolean resetState,
MaterializedIndex.IndexExtState extState) {
LakeTable copied = DeepCopy.copyWithGson(this, LakeTable.class);
if (copied == null) {
LOG.warn("failed to copy lake table: {}", getName());
return null;
}
return selectiveCopyInternal(copied, reservedPartitions, resetState, extState);
}

@Override
public void write(DataOutput out) throws IOException {
// write type first
Expand Down
17 changes: 12 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Expand Up @@ -3785,8 +3785,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, dbTbl.getTbl());
}

if (table.getType() != Table.TableType.OLAP) {
throw new DdlException("Only support truncate OLAP table");
if (!table.isOlapOrLakeTable()) {
throw new DdlException("Only support truncate OLAP table or LAKE table");
}

OlapTable olapTable = (OlapTable) table;
Expand Down Expand Up @@ -3892,7 +3892,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
}

// replace
truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
truncateTableInternal(olapTable, newPartitions, truncateEntireTable, false);

// write edit log
TruncateTableInfo info = new TruncateTableInfo(db.getId(), olapTable.getId(), newPartitions,
Expand All @@ -3906,7 +3906,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
tblRef.getName().toSql(), tblRef.getPartitionNames());
}

private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions,
boolean isEntireTable, boolean isReplay) {
// use new partitions to replace the old ones.
Set<Long> oldTabletIds = Sets.newHashSet();
for (Partition newPartition : newPartitions) {
Expand All @@ -3928,14 +3929,20 @@ private void truncateTableInternal(OlapTable olapTable, List<Partition> newParti
for (Long tabletId : oldTabletIds) {
GlobalStateMgr.getCurrentInvertedIndex().deleteTablet(tabletId);
}

// if it is lake table, need to delete shard and drop tablet
if (olapTable.isLakeTable() && !isReplay) {
stateMgr.getShardManager().getShardDeleter().addUnusedShardId(oldTabletIds);
editLog.logAddUnusedShard(oldTabletIds);
}
}

public void replayTruncateTable(TruncateTableInfo info) {
Database db = getDb(info.getDbId());
db.writeLock();
try {
OlapTable olapTable = (OlapTable) db.getTable(info.getTblId());
truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable(), true);

if (!GlobalStateMgr.isCheckpointThread()) {
// add tablet to inverted index
Expand Down

0 comments on commit 72eee1b

Please sign in to comment.