Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,9 @@ public static boolean isNotCloudMode() {
@ConfField(mutable = true)
public static int cloud_cold_read_percent = 10; // 10%

@ConfField(mutable = true)
public static int get_tablet_stat_batch_size = 1000;

// The original meta read lock is not enough to keep a snapshot of partition versions,
// so the execution of `createScanRangeLocations` are delayed to `Coordinator::exec`,
// to help to acquire a snapshot of partition versions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.rpc.RpcException;

import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;

/*
* CloudTabletStatMgr is for collecting tablet(replica) statistics from backends.
* Each FE will collect by itself.
*/
public class CloudTabletStatMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(CloudTabletStatMgr.class);

private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The taskPool seems redundant


// <(dbId, tableId) -> CloudTableStats>
private ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> cloudTableStatsMap = new ConcurrentHashMap<>();

public CloudTabletStatMgr() {
super("cloud tablet stat mgr", Config.tablet_stat_update_interval_second * 1000);
}

@Override
protected void runAfterCatalogReady() {
LOG.info("cloud tablet stat begin");
long start = System.currentTimeMillis();

List<GetTabletStatsRequest> reqList = new ArrayList<GetTabletStatsRequest>();
GetTabletStatsRequest.Builder builder = GetTabletStatsRequest.newBuilder();
List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}

List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (table.getType() != TableType.OLAP) {
continue;
}

table.readLock();
try {
OlapTable tbl = (OlapTable) table;
for (Partition partition : tbl.getAllPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Long tabletId : index.getTabletIdsInOrder()) {
Tablet tablet = index.getTablet(tabletId);
TabletIndexPB.Builder tabletBuilder = TabletIndexPB.newBuilder();
tabletBuilder.setDbId(dbId);
tabletBuilder.setTableId(table.getId());
tabletBuilder.setIndexId(index.getId());
tabletBuilder.setPartitionId(partition.getId());
tabletBuilder.setTabletId(tablet.getId());
builder.addTabletIdx(tabletBuilder);

if (builder.getTabletIdxCount() >= Config.get_tablet_stat_batch_size) {
reqList.add(builder.build());
builder = GetTabletStatsRequest.newBuilder();
}
}
}
} // partitions
} finally {
table.readUnlock();
}
} // tables
} // end for dbs

if (builder.getTabletIdxCount() > 0) {
reqList.add(builder.build());
}

for (GetTabletStatsRequest req : reqList) {
GetTabletStatsResponse resp;
try {
resp = getTabletStats(req);
} catch (RpcException e) {
LOG.info("get tablet stats exception:", e);
continue;
}

if (resp.getStatus().getCode() != MetaServiceCode.OK) {
continue;
}

if (LOG.isDebugEnabled()) {
int i = 0;
for (TabletIndexPB idx : req.getTabletIdxList()) {
LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: {} size: {}",
idx.getDbId(), idx.getTableId(), idx.getIndexId(), idx.getTabletId(),
resp.getTabletStats(i++).getDataSize());
}
}
updateTabletStat(resp);
}

LOG.info("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));

// after update replica in all backends, update index row num
start = System.currentTimeMillis();
ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> newCloudTableStatsMap = new ConcurrentHashMap<>();
for (Long dbId : dbIds) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}

List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (table.getType() != TableType.OLAP) {
continue;
}
OlapTable olapTable = (OlapTable) table;

String dbName = db.getName();
Long tableId = table.getId();
String tableName = table.getName();

Long tableDataSize = 0L;
Long tableRowsetCount = 0L;
Long tableSegmentCount = 0L;
Long tableRowCount = 0L;

if (!table.writeLockIfExist()) {
continue;
}

try {
for (Partition partition : olapTable.getAllPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
long indexRowCount = 0L;
for (Tablet tablet : index.getTablets()) {
long tabletDataSize = 0L;
long tabletRowsetCount = 0L;
long tabletSegmentCount = 0L;
long tabletRowCount = 0L;

for (Replica replica : tablet.getReplicas()) {
if (replica.getDataSize() > tabletDataSize) {
tabletDataSize = replica.getDataSize();
}

if (replica.getRowsetCount() > tabletRowsetCount) {
tabletRowsetCount = replica.getRowsetCount();
}

if (replica.getSegmentCount() > tabletSegmentCount) {
tabletSegmentCount = replica.getSegmentCount();
}

if (replica.getRowCount() > tabletRowCount) {
tabletRowCount = replica.getRowCount();
}
}

tableDataSize += tabletDataSize;
tableRowsetCount += tabletRowsetCount;
tableSegmentCount += tabletSegmentCount;
tableRowCount += tabletRowCount;

indexRowCount += tabletRowCount;
} // end for tablets
index.setRowCount(indexRowCount);
} // end for indices
} // end for partitions
LOG.debug("finished to set row num for table: {} in database: {}",
table.getName(), db.getFullName());
} finally {
table.writeUnlock();
}

newCloudTableStatsMap.put(Pair.of(dbId, tableId), new CloudTableStats(dbName, tableName,
tableDataSize, tableRowsetCount, tableSegmentCount, tableRowCount));
}
}
this.cloudTableStatsMap = newCloudTableStatsMap;
LOG.info("finished to update index row num of all databases. cost: {} ms",
(System.currentTimeMillis() - start));
}

private void updateTabletStat(GetTabletStatsResponse response) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (TabletStatsPB stat : response.getTabletStatsList()) {
if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) != null) {
List<Replica> replicas = invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
if (replicas != null && !replicas.isEmpty() && replicas.get(0) != null) {
replicas.get(0).updateCloudStat(stat.getDataSize(), stat.getNumRowsets(),
stat.getNumSegments(), stat.getNumRows());
}
}
}
}

private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest request)
throws RpcException {
GetTabletStatsResponse response;
try {
response = MetaServiceProxy.getInstance().getTabletStats(request);
} catch (RpcException e) {
LOG.info("get tablet stat get exception:", e);
throw e;
}
return response;
}

public ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> getCloudTableStatsMap() {
return this.cloudTableStatsMap;
}

public static class CloudTableStats {
@Getter
private String dbName;
@Getter
private String tableName;

@Getter
private Long tableDataSize;
@Getter
private Long tableRowsetCount;
@Getter
private Long tableSegmentCount;
@Getter
private Long tableRowCount;

public CloudTableStats(String dbName, String tableName, Long tableDataSize, Long tableRowsetCount,
Long tableSegmentCount, Long tableRowCount) {
this.dbName = dbName;
this.tableName = tableName;
this.tableDataSize = tableDataSize;
this.tableRowsetCount = tableRowsetCount;
this.tableSegmentCount = tableSegmentCount;
this.tableRowCount = tableRowCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public String getFullName() {
return fullQualifiedName;
}

public String getName() {
String[] strs = fullQualifiedName.split(":");
return strs.length == 2 ? strs[1] : strs[0];
}

public void setNameWithLock(String newName) {
writeLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ public class Env {

private TabletStatMgr tabletStatMgr;

private CloudTabletStatMgr cloudTabletStatMgr;

private Auth auth;
private AccessControllerManager accessManager;

Expand Down Expand Up @@ -691,6 +693,7 @@ public Env(boolean isCheckpointCatalog) {
this.globalTransactionMgr = EnvFactory.getInstance().createGlobalTransactionMgr(this);

this.tabletStatMgr = new TabletStatMgr();
this.cloudTabletStatMgr = new CloudTabletStatMgr();

this.auth = new Auth();
this.accessManager = new AccessControllerManager(auth);
Expand Down Expand Up @@ -1669,7 +1672,11 @@ protected void startMasterOnlyDaemonThreads() {
private void startNonMasterDaemonThreads() {
// start load manager thread
loadManager.start();
tabletStatMgr.start();
if (Config.isNotCloudMode()) {
tabletStatMgr.start();
} else {
cloudTabletStatMgr.start();
}
// load and export job label cleaner thread
labelCleaner.start();
// es repository
Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public static class ReplicaContext {
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
private long segmentCount = 0L;
private long rowsetCount = 0L;

private long userDropTime = -1;

Expand Down Expand Up @@ -258,6 +260,14 @@ public long getRowCount() {
return rowCount;
}

public long getSegmentCount() {
return segmentCount;
}

public long getRowsetCount() {
return rowsetCount;
}

public long getLastFailedVersion() {
return lastFailedVersion;
}
Expand Down Expand Up @@ -351,6 +361,13 @@ public synchronized void updateStat(long dataSize, long remoteDataSize, long row
this.versionCount = versionCount;
}

public synchronized void updateCloudStat(long dataSize, long rowsetNum, long segmentNum, long rowNum) {
this.dataSize = dataSize;
this.rowsetCount = rowsetNum;
this.segmentCount = segmentNum;
this.rowCount = rowNum;
}

public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize,
long newRowCount) {
updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize,
Expand Down