Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DataNodeSpaceQuotaManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class);

private Map<String, TSpaceQuota> spaceQuotaLimit;
private Map<String, TSpaceQuota> spaceQuotaUsage;
private ConcurrentMap<String, TSpaceQuota> spaceQuotaLimit;
private ConcurrentMap<String, TSpaceQuota> spaceQuotaUsage;
private DataNodeSizeStore dataNodeSizeStore;

public DataNodeSpaceQuotaManager() {
spaceQuotaLimit = new HashMap<>();
spaceQuotaUsage = new HashMap<>();
spaceQuotaLimit = new ConcurrentHashMap<>();
spaceQuotaUsage = new ConcurrentHashMap<>();
dataNodeSizeStore = new DataNodeSizeStore();
recover();
}

public DataNodeSpaceQuotaManager(
Map<String, TSpaceQuota> spaceQuotaLimit, Map<String, TSpaceQuota> spaceQuotaUsage) {
final ConcurrentMap<String, TSpaceQuota> spaceQuotaLimit,
final ConcurrentMap<String, TSpaceQuota> spaceQuotaUsage) {
this.spaceQuotaLimit = spaceQuotaLimit;
this.spaceQuotaUsage = spaceQuotaUsage;
}
Expand All @@ -67,87 +70,67 @@ public static DataNodeSpaceQuotaManager getInstance() {
return DataNodeSpaceQuotaManagerHolder.INSTANCE;
}

public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
for (String database : req.getDatabase()) {
spaceQuotaLimit.put(database, req.getSpaceLimit());
public TSStatus setSpaceQuota(final TSetSpaceQuotaReq req) {
for (final String database : req.getDatabase()) {
spaceQuotaUsage.put(database, new TSpaceQuota());
spaceQuotaLimit.put(database, req.getSpaceLimit());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}

private void recover() {
TSpaceQuotaResp spaceQuota = ClusterConfigTaskExecutor.getInstance().getSpaceQuota();
final TSpaceQuotaResp spaceQuota = ClusterConfigTaskExecutor.getInstance().getSpaceQuota();
if (spaceQuota.getStatus() != null) {
if (spaceQuota.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& spaceQuota.getSpaceQuota() != null) {
for (String database : spaceQuota.getSpaceQuota().keySet()) {
spaceQuotaLimit.put(database, spaceQuota.getSpaceQuota().get(database));
for (final String database : spaceQuota.getSpaceQuota().keySet()) {
spaceQuotaUsage.put(database, new TSpaceQuota());
spaceQuotaLimit.put(database, spaceQuota.getSpaceQuota().get(database));
}
}
LOGGER.info("Space quota limit restored succeeded. " + spaceQuotaLimit.toString());
LOGGER.info("Space quota limit restore succeeded, limit: {}.", spaceQuotaLimit);
} else {
LOGGER.error("Space quota limit restored failed. " + spaceQuotaLimit.toString());
LOGGER.error("Space quota limit restore failed, limit: {}.", spaceQuotaLimit);
}
}

public boolean checkDeviceLimit(String database) {
database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + database;
TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
if (spaceQuota == null) {
return true;
} else if (spaceQuota.getDeviceNum() == 0 || spaceQuota.getDeviceNum() == -1) {
return true;
}
long deviceNum = spaceQuotaUsage.get(database).getDeviceNum();
if (spaceQuota.getDeviceNum() - deviceNum > 0) {
return true;
}
return false;
final TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
return spaceQuota == null
|| spaceQuota.getDeviceNum() == 0
|| spaceQuota.getDeviceNum() == -1
|| spaceQuota.getDeviceNum() - spaceQuotaUsage.get(database).getDeviceNum() > 0;
}

public void updateSpaceQuotaUsage(Map<String, TSpaceQuota> spaceQuotaUsage) {
this.spaceQuotaUsage = spaceQuotaUsage;
public void updateSpaceQuotaUsage(final Map<String, TSpaceQuota> spaceQuotaUsage) {
if (Objects.nonNull(spaceQuotaUsage)) {
this.spaceQuotaUsage.putAll(spaceQuotaUsage);
}
}

public boolean checkTimeSeriesNum(String database) {
database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + database;
TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
if (spaceQuota == null) {
return true;
} else if (spaceQuota.getTimeserieNum() == 0 || spaceQuota.getTimeserieNum() == -1) {
return true;
}
long timeSeriesNum = spaceQuotaUsage.get(database).getTimeserieNum();
if (spaceQuota.getTimeserieNum() - timeSeriesNum > 0) {
return true;
}
return false;
final TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
return spaceQuota == null
|| spaceQuota.getTimeserieNum() == 0
|| spaceQuota.getTimeserieNum() == -1
|| spaceQuota.getTimeserieNum() - spaceQuotaUsage.get(database).getTimeserieNum() > 0;
}

public boolean checkRegionDisk(String database) {
TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
if (spaceQuota == null) {
return true;
} else if (spaceQuota.getDiskSize() == 0 || spaceQuota.getDiskSize() == -1) {
return true;
}
long diskSize = spaceQuotaUsage.get(database).getDiskSize();
if (spaceQuota.getDiskSize() - diskSize > 0) {
return true;
}
return false;
return spaceQuota == null
|| spaceQuota.getDiskSize() == 0
|| spaceQuota.getDiskSize() == -1
|| spaceQuota.getDiskSize() - spaceQuotaUsage.get(database).getDiskSize() > 0;
}

public void setDataRegionIds(List<Integer> dataRegionIds) {
public void setDataRegionIds(final List<Integer> dataRegionIds) {
dataNodeSizeStore.setDataRegionIds(dataRegionIds);
}

public Map<Integer, Long> getRegionDisk() {
return dataNodeSizeStore.getDataRegionDisk();
}

public void setSpaceQuotaLimit(Map<String, TSpaceQuota> spaceQuotaLimit) {
this.spaceQuotaLimit = spaceQuotaLimit;
}
}