Skip to content

Commit

Permalink
优化存储相关 (#327)
Browse files Browse the repository at this point in the history
* 调整buffer清理策略
* 自动删除不连续的文件
* 删除消息异常时重试
* 物理删除group
* 量大时快速失败,防止fullgc
* 调整默认阈值
* 调整测试用例
* 添加入队超时配置
* 修改删除group逻辑
* 修复特殊主题删除问题
  • Loading branch information
llIlll committed Dec 28, 2020
1 parent b6b2136 commit b1e5390
Show file tree
Hide file tree
Showing 19 changed files with 567 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ public class BufferPoolMonitorInfo {
private String used;
private String maxMemorySize;
private String mmpUsed;
private String mmpFd;
private String directUsed;
private String directFd;
private int directDestroy;
private int directAllocate;
private int mmapDestroy;
private int mmapAllocate;

private List<PLMonitorInfo> plMonitorInfos;

Expand Down Expand Up @@ -61,6 +67,14 @@ public void setMmpUsed(String mmpUsed) {
this.mmpUsed = mmpUsed;
}

public String getMmpFd() {
return mmpFd;
}

public void setMmpFd(String mmpFd) {
this.mmpFd = mmpFd;
}

public String getDirectUsed() {
return directUsed;
}
Expand All @@ -69,6 +83,14 @@ public void setDirectUsed(String directUsed) {
this.directUsed = directUsed;
}

public String getDirectFd() {
return directFd;
}

public void setDirectFd(String directFd) {
this.directFd = directFd;
}

public List<PLMonitorInfo> getPlMonitorInfos() {
return plMonitorInfos;
}
Expand All @@ -77,6 +99,38 @@ public void setPlMonitorInfos(List<PLMonitorInfo> plMonitorInfos) {
this.plMonitorInfos = plMonitorInfos;
}

public int getDirectDestroy() {
return directDestroy;
}

public void setDirectDestroy(int directDestroy) {
this.directDestroy = directDestroy;
}

public int getDirectAllocate() {
return directAllocate;
}

public void setDirectAllocate(int directAllocate) {
this.directAllocate = directAllocate;
}

public int getMmapDestroy() {
return mmapDestroy;
}

public void setMmapDestroy(int mmapDestroy) {
this.mmapDestroy = mmapDestroy;
}

public int getMmapAllocate() {
return mmapAllocate;
}

public void setMmapAllocate(int mmapAllocate) {
this.mmapAllocate = mmapAllocate;
}

public static class PLMonitorInfo {
private String cached;
private String usedPreLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.joyqueue.toolkit.config.PropertyDef;
import org.joyqueue.toolkit.config.PropertySupplier;

import static org.joyqueue.toolkit.config.Property.APPLICATION_DATA_PATH;

/**
Expand All @@ -26,8 +27,11 @@ public class BrokerStoreConfig {
public static final String DEFAULT_CLEAN_STRATEGY_CLASS = "GlobalStorageLimitCleaningStrategy";
public static final long DEFAULT_MAX_STORE_SIZE = 10L * 1024 * 1024 * 1024; // 10gb
public static final long DEFAULT_MAX_STORE_TIME = 1000 * 60 * 60 * 24 * 7; // 7days
public static final long DEFAULT_STORE_CLEAN_SCHEDULE_BEGIN = 5 * 60 * 1000;
public static final long DEFAULT_STORE_CLEAN_SCHEDULE_END = 10 * 60 * 1000;
public static final long DEFAULT_STORE_CLEAN_SCHEDULE_BEGIN = 1 * 60 * 1000;
public static final long DEFAULT_STORE_CLEAN_SCHEDULE_END = 5 * 60 * 1000;
public static final long DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN = 1 * 60 * 1000;
public static final long DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END = 5 * 60 * 1000;
public static final long DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL = 100;
public static final boolean DEFAULT_KEEP_UNCONSUMED = true;
public static final int DEFAULT_STORE_DISK_USAGE_MAX= 80;
public static final int DEFAULT_STORE_DISK_USAGE_SAFE=75;
Expand All @@ -49,7 +53,12 @@ public enum BrokerStoreConfigKey implements PropertyDef {
CLEAN_SCHEDULE_END("store.clean.schedule.end", DEFAULT_STORE_CLEAN_SCHEDULE_END, Type.LONG),
FORCE_RESTORE("store.force.restore", true, Type.BOOLEAN),
STORE_DISK_USAGE_MAX("store.disk.usage.max",DEFAULT_STORE_DISK_USAGE_MAX,Type.INT),
STORE_DISK_USAGE_SAFE("store.disk.usage.safe",DEFAULT_STORE_DISK_USAGE_SAFE,Type.INT);
STORE_DISK_USAGE_SAFE("store.disk.usage.safe",DEFAULT_STORE_DISK_USAGE_SAFE,Type.INT),
STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN("store.physical.clean.schedule.begin",DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN,Type.INT),
STORE_PHYSICAL_CLEAN_SCHEDULE_END("store.physical.clean.schedule.end",DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END,Type.INT),
STORE_PHYSICAL_CLEAN_INTERVAL("store.physical.clean.interval",DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL,Type.INT),

;
private String name;
private Object value;
private Type type;
Expand Down Expand Up @@ -158,5 +167,17 @@ public int getStoreDiskUsageSafe(){
public String getApplicationDataPath(){
return propertySupplier.getOrCreateProperty(APPLICATION_DATA_PATH).getString();
}

public int getStorePhysicalCleanScheduleBegin() {
return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN, DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN);
}

public int getStorePhysicalCleanScheduleEnd() {
return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_SCHEDULE_END, DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END);
}

public int getStorePhysicalCleanInterval() {
return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_INTERVAL, DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.joyqueue.domain.TopicName;
import org.joyqueue.monitor.BufferPoolMonitorInfo;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.RemovedPartitionGroupStore;
import org.joyqueue.store.StoreManagementService;
import org.joyqueue.store.StoreNode;
import org.joyqueue.store.StoreNodes;
Expand Down Expand Up @@ -165,6 +166,16 @@ public void removePartitionGroup(String topic, int partitionGroup) {
storeService.removePartitionGroup(topic, partitionGroup);
}

@Override
public void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup) {
storeService.physicalDeleteRemovedPartitionGroup(topic, partitionGroup);
}

@Override
public List<RemovedPartitionGroupStore> getRemovedPartitionGroups() {
return storeService.getRemovedPartitionGroups();
}

@Override
public void restorePartitionGroup(String topic, int partitionGroup) throws Exception {
storeService.restorePartitionGroup(topic, partitionGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.jd.laf.extension.ExtensionManager;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.config.BrokerStoreConfig;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.store.RemovedPartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import com.jd.laf.extension.ExtensionManager;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,6 +58,7 @@ public class StoreCleanManager extends Service {
private PositionManager positionManager;
private Map<String, StoreCleaningStrategy> cleaningStrategyMap;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService physicalDeleteScheduledExecutorService;
private ScheduledFuture cleanFuture;

public StoreCleanManager(final PropertySupplier propertySupplier, final StoreService storeService, final ClusterManager clusterManager, final PositionManager positionManager) {
Expand All @@ -66,6 +68,7 @@ public StoreCleanManager(final PropertySupplier propertySupplier, final StoreSer
this.clusterManager = clusterManager;
this.positionManager = positionManager;
this.scheduledExecutorService = Executors.newScheduledThreadPool(SCHEDULE_EXECUTOR_THREADS, new NamedThreadFactory("StoreCleaning-Scheduled-Executor"));
this.physicalDeleteScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StoreCleaning-Physical-Scheduled-Executor"));
}

@Override
Expand Down Expand Up @@ -95,6 +98,11 @@ public void start() throws Exception {
ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStoreCleanScheduleBegin(), brokerStoreConfig.getStoreCleanScheduleEnd()),
ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStoreCleanScheduleBegin(), brokerStoreConfig.getStoreCleanScheduleEnd()),
TimeUnit.MILLISECONDS);

physicalDeleteScheduledExecutorService.scheduleWithFixedDelay(this::physicalClean,
ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStorePhysicalCleanScheduleBegin(), brokerStoreConfig.getStorePhysicalCleanScheduleEnd()),
ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStorePhysicalCleanScheduleBegin(), brokerStoreConfig.getStorePhysicalCleanScheduleEnd()),
TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -119,6 +127,26 @@ public void stop() {
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
}

physicalDeleteScheduledExecutorService.shutdownNow();
}

private void physicalClean() {
for (RemovedPartitionGroupStore removedPartitionGroup : storeService.getRemovedPartitionGroups()) {
boolean interrupted = false;
while (removedPartitionGroup.physicalDeleteLeftFile()) {
try {
Thread.currentThread().sleep(brokerStoreConfig.getStorePhysicalCleanInterval());
} catch (InterruptedException e) {
interrupted = true;
break;
}
}
if (!interrupted) {
LOG.info("Store file deleted, topic: {}, partitionGroup: {}", removedPartitionGroup.getTopic(), removedPartitionGroup.getPartitionGroup());
storeService.physicalDeleteRemovedPartitionGroup(removedPartitionGroup.getTopic(), removedPartitionGroup.getPartitionGroup());
}
}
}

private void clean() {
Expand Down Expand Up @@ -161,7 +189,7 @@ private void clean() {
}
}
} catch (Throwable t) {
LOG.error("Error to clean store for topic <{}>, partition group <{}>, exception: {}", topicConfig, partitionGroup.getGroup(), t);
LOG.error("Error to clean store for topic <{}>, partition group <{}>, exception: ", topicConfig, partitionGroup.getGroup(), t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Shorts;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.cluster.event.CompensateEvent;
import org.joyqueue.broker.config.BrokerStoreConfig;
Expand All @@ -37,12 +38,12 @@
import org.joyqueue.nsr.event.RemovePartitionGroupEvent;
import org.joyqueue.nsr.event.RemoveTopicEvent;
import org.joyqueue.nsr.event.UpdatePartitionGroupEvent;
import org.joyqueue.store.NoSuchPartitionGroupException;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -306,12 +307,15 @@ protected void onUpdatePartitionGroup(TopicName topicName, PartitionGroup oldPar
electionService.onNodeAdd(topicName, newPartitionGroup.getGroup(), newPartitionGroup.getElectType(),
brokers, newPartitionGroup.getLearners(), nameService.getBroker(newReplica),
currentBrokerId, newPartitionGroup.getLeader());
storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()]));
}
}

if (oldPartitionGroup.getPartitions().size() != newPartitionGroup.getPartitions().size()) {
storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()]));
try {
storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()]));
} catch (NoSuchPartitionGroupException e) {
logger.error("rePartition exception, topic: {}, group: {}", newPartitionGroup.getTopic(), newPartitionGroup.getGroup(), e);
}
}

for (Integer oldReplica : oldPartitionGroup.getReplicas()) {
Expand All @@ -325,7 +329,6 @@ protected void onUpdatePartitionGroup(TopicName topicName, PartitionGroup oldPar
} else {
logger.info("topic[{}] update partitionGroup[{}] add node[{}] ", topicName, newPartitionGroup.getGroup(), oldReplica);
electionService.onNodeRemove(topicName, newPartitionGroup.getGroup(), oldReplica, currentBrokerId);
storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()]));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.joyqueue.monitor.BufferPoolMonitorInfo;
import org.joyqueue.nsr.NameService;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.RemovedPartitionGroupStore;
import org.joyqueue.store.StoreManagementService;
import org.joyqueue.store.StoreNode;
import org.joyqueue.store.StoreNodes;
Expand Down Expand Up @@ -57,6 +58,16 @@ public void removePartitionGroup(String topic, int partitionGroup) {

}

@Override
public void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup) {

}

@Override
public List<RemovedPartitionGroupStore> getRemovedPartitionGroups() {
return null;
}

@Override
public void restorePartitionGroup(String topic, int partitionGroup) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.joyqueue.store;

/**
* RemovedPartitionGroupStore
* author: gaohaoxiang
* date: 2020/10/28
*/
public interface RemovedPartitionGroupStore {

/**
* 获取Topic
* @return Topic
*/
String getTopic();

/**
* 获取Partition Group 序号
* @return Partition Group 序号
*/
int getPartitionGroup();

/**
* 物理删除最左文件
* @return
*/
boolean physicalDeleteLeftFile();

/**
* 物理删除所有文件,包括目录
* @return
*/
boolean physicalDelete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ public interface StoreService {
*/
void removePartitionGroup(String topic, int partitionGroup);

/**
* 物理删除已删除的group
* @param topic
* @param partitionGroup
*/
void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup);

/**
* 获取已删除的group
* @return
*/
List<RemovedPartitionGroupStore> getRemovedPartitionGroups();

/**
* 从磁盘恢复partition group,系统启动时调用
*/
Expand Down
Loading

0 comments on commit b1e5390

Please sign in to comment.