Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-499] Add configure store #386

Merged
merged 1 commit into from
Jan 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,13 @@ public static byte[] buildTestData(int bodySize) {
dataBuffer.flip();
return dataBuffer.array();
}

// get the middle data between min, max, and data
public static int mid(int data, int min, int max) {
return Math.max(min, Math.min(max, data));
}

public static long mid(long data, long min, long max) {
return Math.max(min, Math.min(max, data));
}
}
18 changes: 18 additions & 0 deletions tubemq-core/src/main/proto/MasterService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,24 @@ message MasterBrokerAuthorizedInfo {
optional string authAuthorizedToken = 2;
}

message ApprovedClientConfig {
required int64 configId = 1;
optional int32 maxMsgSize = 2;
}

message ClusterDefConfig {
required int64 configId = 1;
optional int32 maxMsgSize = 2;
}

message RegisterRequestP2M {
required string clientId = 1;
repeated string topicList = 2;
required int64 brokerCheckSum = 3;
required string hostName = 4;
optional MasterCertificateInfo authInfo = 5;
optional string jdkVersion = 6;
optional ApprovedClientConfig appdConfig = 7;
}

message RegisterResponseM2P {
Expand All @@ -77,6 +88,7 @@ message RegisterResponseM2P {
required int64 brokerCheckSum = 4;
repeated string brokerInfos = 5;
optional MasterAuthorizedInfo authorizedInfo = 6;
optional ApprovedClientConfig appdConfig = 7;
}

message HeartRequestP2M {
Expand All @@ -85,6 +97,7 @@ message HeartRequestP2M {
required string hostName = 3;
repeated string topicList = 4;
optional MasterCertificateInfo authInfo = 5;
optional ApprovedClientConfig appdConfig = 6;
}

message HeartResponseM2P {
Expand All @@ -97,6 +110,7 @@ message HeartResponseM2P {
repeated string brokerInfos = 6;
optional bool requireAuth = 7;
optional MasterAuthorizedInfo authorizedInfo = 8;
optional ApprovedClientConfig appdConfig = 9;
}

message CloseRequestP2M{
Expand Down Expand Up @@ -208,6 +222,7 @@ message RegisterRequestB2M {
optional int32 qryPriorityId = 12;
optional int32 tlsPort = 13;
optional MasterCertificateInfo authInfo = 14;
optional ClusterDefConfig clsDefConfig = 15;
}

message RegisterResponseM2B {
Expand All @@ -230,6 +245,7 @@ message RegisterResponseM2B {
optional int32 qryPriorityId = 15;
optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17;
optional ClusterDefConfig clsDefConfig = 18;
}

message HeartRequestB2M {
Expand All @@ -250,6 +266,7 @@ message HeartRequestB2M {
optional int64 flowCheckId = 13;
optional int32 qryPriorityId = 14;
optional MasterCertificateInfo authInfo = 15;
optional ClusterDefConfig clsDefConfig = 16;
}

message HeartResponseM2B {
Expand All @@ -275,6 +292,7 @@ message HeartResponseM2B {
optional int32 qryPriorityId = 17;
optional MasterAuthorizedInfo authorizedInfo = 18; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19;
optional ClusterDefConfig clsDefConfig = 20;
}

message CloseRequestB2M {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public final class TServerConstants {
public static final String TOKEN_JOB_TOPICS = "topics";
public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl";
public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config";

public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";

public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
Expand Down Expand Up @@ -103,4 +104,10 @@ boolean putBdbConsumeGroupSettingEntity(BdbConsumeGroupSettingEntity offsetReset
boolean isNew);

ConcurrentHashMap<String, BdbConsumeGroupSettingEntity> getConsumeGroupSettingMap();

boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew);

boolean delBdbClusterConfEntity();

ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.Server;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
import org.apache.tubemq.server.master.MasterConfig;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
Expand All @@ -72,6 +74,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Bdb store service
* like a local database manager, according to database table name, store instance, primary key, memory cache
Expand All @@ -80,6 +83,7 @@
public class DefaultBdbStoreService implements BdbStoreService, Server {
private static final Logger logger = LoggerFactory.getLogger(DefaultBdbStoreService.class);

private static final String BDB_CLUSTER_SETTING_STORE_NAME = "bdbClusterSetting";
private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig";
private static final String BDB_BROKER_CONFIG_STORE_NAME = "bdbBrokerConfig";
private static final String BDB_CONSUMER_GROUP_STORE_NAME = "bdbConsumerGroup";
Expand Down Expand Up @@ -152,6 +156,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex;
private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
new ConcurrentHashMap<>();
// cluster default setting store
private EntityStore clusterDefSettingStore;
private PrimaryIndex<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingIndex;
private ConcurrentHashMap<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingMap =
new ConcurrentHashMap<>();
// service status
private AtomicBoolean isStarted = new AtomicBoolean(false);
// master role flag
Expand Down Expand Up @@ -386,6 +395,14 @@ public void stop() throws Exception {
logger.error("[BDB Error] Close groupFlowCtrlStore error ", e);
}
}
if (clusterDefSettingStore != null) {
try {
clusterDefSettingStore.close();
clusterDefSettingStore = null;
} catch (Throwable e) {
logger.error("[BDB Error] Close clusterDefSettingStore error ", e);
}
}
/* evn close */
if (repEnv != null) {
try {
Expand Down Expand Up @@ -769,6 +786,39 @@ public boolean delBdbConsumeGroupSettingEntity(String consumeGroupName) {
return true;
}

/**
* Put cluster default setting bdb entity
*
* @param clusterConfEntity
* @param isNew
* @return
*/
@Override
public boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew) {
BdbClusterSettingEntity result = null;
try {
result = clusterDefSettingIndex.put(clusterConfEntity);
} catch (Throwable e) {
logger.error("[BDB Error] Put ClusterConfEntity Error ", e);
return false;
}
if (isNew) {
return result == null;
}
return result != null;
}

@Override
public boolean delBdbClusterConfEntity() {
try {
clusterDefSettingIndex.delete(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
} catch (Throwable e) {
logger.error("[BDB Error] delBdbClusterConfEntity Error ", e);
return false;
}
return true;
}

@Override
public ConcurrentHashMap<String,
ConcurrentHashMap<String, BdbConsumerGroupEntity>> getConsumerGroupNameAccControlMap() {
Expand Down Expand Up @@ -797,6 +847,11 @@ public ConcurrentHashMap<String, BdbConsumeGroupSettingEntity> getConsumeGroupSe
return this.consumeGroupSettingMap;
}

@Override
public ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap() {
return this.clusterDefSettingMap;
}

/**
* Get master group status
*
Expand Down Expand Up @@ -977,6 +1032,10 @@ private void initMetaStore() {
new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig);
consumeGroupSettingIndex =
consumeGroupSettingStore.getPrimaryIndex(String.class, BdbConsumeGroupSettingEntity.class);
clusterDefSettingStore =
new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME, storeConfig);
clusterDefSettingIndex =
clusterDefSettingStore.getPrimaryIndex(String.class, BdbClusterSettingEntity.class);
}

/**
Expand Down Expand Up @@ -1394,6 +1453,41 @@ private void loadConsumeGroupSettingUnits() throws Exception {
logger.info("loadConsumeGroupSettingUnits successfully...");
}


private void loadClusterDefSettingUnits() throws Exception {
long count = 0L;
EntityCursor<BdbClusterSettingEntity> cursor = null;
logger.info("loadClusterDefSettingUnits start...");
try {
cursor = clusterDefSettingIndex.entities();
clusterDefSettingMap.clear();
StringBuilder sBuilder = logger.isDebugEnabled() ? new StringBuilder(512) : null;
logger.debug("[loadClusterDefSettingUnits] Load consumer group begin:");
for (BdbClusterSettingEntity bdbEntity : cursor) {
if (bdbEntity == null) {
logger.warn("[BDB Error] Found Null data while loading from clusterDefSettingIndex!");
continue;
}
clusterDefSettingMap.put(bdbEntity.getRecordKey(), bdbEntity);
count++;
if (logger.isDebugEnabled()) {
logger.debug(bdbEntity.toJsonString(sBuilder).toString());
sBuilder.delete(0, sBuilder.length());
}
}
logger.debug("[loadClusterDefSettingUnits] Load consumer group finished!");
logger.info("[loadClusterDefSettingUnits] total load records are {}", count);
} catch (Exception e) {
logger.error("[loadClusterDefSettingUnits error] ", e);
throw e;
} finally {
if (cursor != null) {
cursor.close();
}
}
logger.info("loadClusterDefSettingUnits successfully...");
}

public class Listener implements StateChangeListener {
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
Expand Down Expand Up @@ -1424,6 +1518,7 @@ public void run() {
if (!isMaster) {
try {
clearCachedRunData();
loadClusterDefSettingUnits();
loadBrokerConfUnits();
loadTopicConfUnits();
loadGroupFlowCtrlUnits();
Expand Down
Loading