diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeSpaceQuotaManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeSpaceQuotaManager.java index c448febf3b74f..9b1ef020b063e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeSpaceQuotaManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeSpaceQuotaManager.java @@ -31,27 +31,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class DataNodeSpaceQuotaManager { private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class); - private Map spaceQuotaLimit; - private Map spaceQuotaUsage; + private ConcurrentMap spaceQuotaLimit; + private ConcurrentMap spaceQuotaUsage; private DataNodeSizeStore dataNodeSizeStore; public DataNodeSpaceQuotaManager() { - spaceQuotaLimit = new HashMap<>(); - spaceQuotaUsage = new HashMap<>(); + spaceQuotaLimit = new ConcurrentHashMap<>(); + spaceQuotaUsage = new ConcurrentHashMap<>(); dataNodeSizeStore = new DataNodeSizeStore(); recover(); } public DataNodeSpaceQuotaManager( - Map spaceQuotaLimit, Map spaceQuotaUsage) { + final ConcurrentMap spaceQuotaLimit, + final ConcurrentMap spaceQuotaUsage) { this.spaceQuotaLimit = spaceQuotaLimit; this.spaceQuotaUsage = spaceQuotaUsage; } @@ -67,87 +70,67 @@ public static DataNodeSpaceQuotaManager getInstance() { return DataNodeSpaceQuotaManagerHolder.INSTANCE; } - public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) { - for (String database : req.getDatabase()) { - spaceQuotaLimit.put(database, req.getSpaceLimit()); + public TSStatus setSpaceQuota(final TSetSpaceQuotaReq req) { + for (final String database : req.getDatabase()) { spaceQuotaUsage.put(database, new TSpaceQuota()); + spaceQuotaLimit.put(database, req.getSpaceLimit()); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } private void recover() { - TSpaceQuotaResp spaceQuota = ClusterConfigTaskExecutor.getInstance().getSpaceQuota(); + final TSpaceQuotaResp spaceQuota = ClusterConfigTaskExecutor.getInstance().getSpaceQuota(); if (spaceQuota.getStatus() != null) { if (spaceQuota.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && spaceQuota.getSpaceQuota() != null) { - for (String database : spaceQuota.getSpaceQuota().keySet()) { - spaceQuotaLimit.put(database, spaceQuota.getSpaceQuota().get(database)); + for (final String database : spaceQuota.getSpaceQuota().keySet()) { spaceQuotaUsage.put(database, new TSpaceQuota()); + spaceQuotaLimit.put(database, spaceQuota.getSpaceQuota().get(database)); } } - LOGGER.info("Space quota limit restored succeeded. " + spaceQuotaLimit.toString()); + LOGGER.info("Space quota limit restore succeeded, limit: {}.", spaceQuotaLimit); } else { - LOGGER.error("Space quota limit restored failed. " + spaceQuotaLimit.toString()); + LOGGER.error("Space quota limit restore failed, limit: {}.", spaceQuotaLimit); } } public boolean checkDeviceLimit(String database) { database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + database; - TSpaceQuota spaceQuota = spaceQuotaLimit.get(database); - if (spaceQuota == null) { - return true; - } else if (spaceQuota.getDeviceNum() == 0 || spaceQuota.getDeviceNum() == -1) { - return true; - } - long deviceNum = spaceQuotaUsage.get(database).getDeviceNum(); - if (spaceQuota.getDeviceNum() - deviceNum > 0) { - return true; - } - return false; + final TSpaceQuota spaceQuota = spaceQuotaLimit.get(database); + return spaceQuota == null + || spaceQuota.getDeviceNum() == 0 + || spaceQuota.getDeviceNum() == -1 + || spaceQuota.getDeviceNum() - spaceQuotaUsage.get(database).getDeviceNum() > 0; } - public void updateSpaceQuotaUsage(Map spaceQuotaUsage) { - this.spaceQuotaUsage = spaceQuotaUsage; + public void updateSpaceQuotaUsage(final Map spaceQuotaUsage) { + if (Objects.nonNull(spaceQuotaUsage)) { + this.spaceQuotaUsage.putAll(spaceQuotaUsage); + } } public boolean checkTimeSeriesNum(String database) { database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + database; - TSpaceQuota spaceQuota = spaceQuotaLimit.get(database); - if (spaceQuota == null) { - return true; - } else if (spaceQuota.getTimeserieNum() == 0 || spaceQuota.getTimeserieNum() == -1) { - return true; - } - long timeSeriesNum = spaceQuotaUsage.get(database).getTimeserieNum(); - if (spaceQuota.getTimeserieNum() - timeSeriesNum > 0) { - return true; - } - return false; + final TSpaceQuota spaceQuota = spaceQuotaLimit.get(database); + return spaceQuota == null + || spaceQuota.getTimeserieNum() == 0 + || spaceQuota.getTimeserieNum() == -1 + || spaceQuota.getTimeserieNum() - spaceQuotaUsage.get(database).getTimeserieNum() > 0; } public boolean checkRegionDisk(String database) { TSpaceQuota spaceQuota = spaceQuotaLimit.get(database); - if (spaceQuota == null) { - return true; - } else if (spaceQuota.getDiskSize() == 0 || spaceQuota.getDiskSize() == -1) { - return true; - } - long diskSize = spaceQuotaUsage.get(database).getDiskSize(); - if (spaceQuota.getDiskSize() - diskSize > 0) { - return true; - } - return false; + return spaceQuota == null + || spaceQuota.getDiskSize() == 0 + || spaceQuota.getDiskSize() == -1 + || spaceQuota.getDiskSize() - spaceQuotaUsage.get(database).getDiskSize() > 0; } - public void setDataRegionIds(List dataRegionIds) { + public void setDataRegionIds(final List dataRegionIds) { dataNodeSizeStore.setDataRegionIds(dataRegionIds); } public Map getRegionDisk() { return dataNodeSizeStore.getDataRegionDisk(); } - - public void setSpaceQuotaLimit(Map spaceQuotaLimit) { - this.spaceQuotaLimit = spaceQuotaLimit; - } }