Skip to content

Commit

Permalink
Merge 99fe180 into 5ef6043
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Oct 15, 2020
2 parents 5ef6043 + 99fe180 commit 723f015
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ public class ConsumerConfig extends BaseNsrModel {
/**
* 开启就近机房消费
**/
private Boolean nearBy;
private Boolean nearBy = false;

/**
* 是否暂停消费
**/
private Boolean paused;
private Boolean paused = false;

/**
* 是否需要归档,默认不归档
**/
private Boolean archive;
private Boolean archive = false;

/**
* 是否启用重试服务,默认开启
Expand Down Expand Up @@ -75,19 +75,19 @@ public class ConsumerConfig extends BaseNsrModel {
* 最大重试次数(无限制)
**/
@Min(0)
private Integer maxRetrys;
private Integer maxRetrys = 0;

/**
* 最大重试间隔(默认5分钟)
**/
@Min(0)
private Integer maxRetryDelay;
private Integer maxRetryDelay = 0;

/**
* 重试间隔
*/
@Min(0)
private Integer retryDelay;
private Integer retryDelay = 0;

private String region;

Expand All @@ -100,13 +100,13 @@ public class ConsumerConfig extends BaseNsrModel {
* 指数系数
*/
@Min(0)
private Double backOffMultiplier;
private Double backOffMultiplier = 0D;

/**
* 过期时间(默认3天)
**/
@Min(0)
private Integer expireTime;
private Integer expireTime = 0;

/**
* 单队列并行度
Expand All @@ -120,9 +120,9 @@ public class ConsumerConfig extends BaseNsrModel {

private String filters;

private Integer limitTps;
private Integer limitTps = 0;

private Integer limitTraffic;
private Integer limitTraffic = 0;

private Map<String, String> params;

Expand All @@ -131,7 +131,7 @@ public Boolean isNearBy() {
return nearBy;
}

public void setNearBy(boolean nearBy) {
public void setNearBy(Boolean nearBy) {
this.nearBy = nearBy;
}

Expand All @@ -147,103 +147,103 @@ public Boolean isPaused() {
return paused;
}

public void setPaused(boolean paused) {
public void setPaused(Boolean paused) {
this.paused = paused;
}

public Boolean isArchive() {
return archive;
}

public void setArchive(boolean archive) {
public void setArchive(Boolean archive) {
this.archive = archive;
}

public Boolean isRetry() {
return retry;
}

public void setRetry(boolean retry) {
public void setRetry(Boolean retry) {
this.retry = retry;
}

public Integer getDelay() {
return delay;
}

public void setDelay(int delay) {
public void setDelay(Integer delay) {
this.delay = delay;
}

public Integer getAckTimeout() {
return ackTimeout;
}

public void setAckTimeout(int ackTimeout) {
public void setAckTimeout(Integer ackTimeout) {
this.ackTimeout = ackTimeout;
}

public Integer getBatchSize() {
return batchSize;
}

public void setBatchSize(int batchSize) {
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}

public Integer getMaxRetrys() {
return maxRetrys;
}

public void setMaxRetrys(int maxRetrys) {
public void setMaxRetrys(Integer maxRetrys) {
this.maxRetrys = maxRetrys;
}

public Integer getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(int maxRetryDelay) {
public void setMaxRetryDelay(Integer maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}

public Integer getRetryDelay() {
return retryDelay;
}

public void setRetryDelay(int retryDelay) {
public void setRetryDelay(Integer retryDelay) {
this.retryDelay = retryDelay;
}

public Boolean isUseExponentialBackOff() {
return useExponentialBackOff;
}

public void setUseExponentialBackOff(boolean useExponentialBackOff) {
public void setUseExponentialBackOff(Boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}

public Double getBackOffMultiplier() {
return backOffMultiplier;
}

public void setBackOffMultiplier(double backOffMultiplier) {
public void setBackOffMultiplier(Double backOffMultiplier) {
this.backOffMultiplier = backOffMultiplier;
}

public Integer getExpireTime() {
return expireTime;
}

public void setExpireTime(int expireTime) {
public void setExpireTime(Integer expireTime) {
this.expireTime = expireTime;
}

public Integer getConcurrent() {
return concurrent;
}

public void setConcurrent(int concurrent) {
public void setConcurrent(Integer concurrent) {
this.concurrent = concurrent;
}

Expand All @@ -263,15 +263,15 @@ public void setFilters(String filters) {
this.filters = filters;
}

public void setLimitTps(int limitTps) {
public void setLimitTps(Integer limitTps) {
this.limitTps = limitTps;
}

public Integer getLimitTps() {
return limitTps;
}

public void setLimitTraffic(int limitTraffic) {
public void setLimitTraffic(Integer limitTraffic) {
this.limitTraffic = limitTraffic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ProducerConfig extends BaseNsrModel {
/**
* 是否就近发送
**/
private Boolean nearBy;
private Boolean nearBy = false;

/**
* 集群实例发送权重
Expand All @@ -38,7 +38,7 @@ public class ProducerConfig extends BaseNsrModel {
/**
* 是否归档
*/
private Boolean archive;
private Boolean archive = false;

/**
* 单个发送者
Expand All @@ -47,11 +47,11 @@ public class ProducerConfig extends BaseNsrModel {

private String blackList;

private Integer limitTps;
private Integer limitTps = 0;

private Integer limitTraffic;
private Integer limitTraffic = 0;

private Integer timeout;
private Integer timeout = 2000;

private Integer qosLevel;
private String region;
Expand All @@ -70,7 +70,7 @@ public Boolean isNearBy() {
return nearBy;
}

public void setNearBy(boolean nearBy) {
public void setNearBy(Boolean nearBy) {
this.nearBy = nearBy;
}

Expand All @@ -86,15 +86,15 @@ public Boolean isSingle() {
return single;
}

public void setSingle(boolean single) {
public void setSingle(Boolean single) {
this.single = single;
}

public Boolean isArchive() {
return archive;
}

public void setArchive(boolean archive) {
public void setArchive(Boolean archive) {
this.archive = archive;
}

Expand All @@ -106,15 +106,15 @@ public void setBlackList(String blackList) {
this.blackList = blackList;
}

public void setLimitTps(int limitTps) {
public void setLimitTps(Integer limitTps) {
this.limitTps = limitTps;
}

public Integer getLimitTps() {
return limitTps;
}

public void setLimitTraffic(int limitTraffic) {
public void setLimitTraffic(Integer limitTraffic) {
this.limitTraffic = limitTraffic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ protected Consumer backward(org.joyqueue.domain.Consumer nsrConsumer) {
consumerConfig.setBlackList(StringUtils.join(consumerPolicy.getBlackList(), ","));
consumerConfig.setAckTimeout(consumerPolicy.getAckTimeout());
consumerConfig.setArchive(consumerPolicy.getArchive());
consumerConfig.setBatchSize(consumerPolicy.getBatchSize());
if (consumerPolicy.getBatchSize() != null) {
consumerConfig.setBatchSize(Integer.parseInt(consumerPolicy.getBatchSize().toString()));
}
consumerConfig.setConcurrent(consumerPolicy.getConcurrent());
consumerConfig.setDelay(consumerPolicy.getDelay());
consumerConfig.setPaused(consumerPolicy.getPaused());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public int deleteByGroupId(long groupId) {

@Override
public Map<Long, Identity> findGroupByBrokerIds(List<Long> brokerIds) {
List<BrokerGroupRelated> brokerGroupRelateds = repository.findByBrokerIds(brokerIds);
if (CollectionUtils.isNotEmpty(brokerGroupRelateds)) {
return brokerGroupRelateds.stream()
.filter(brokerGroupRelated -> brokerGroupRelated.getGroup() != null)
.collect(Collectors.toMap(BrokerGroupRelated::getId,
BrokerGroupRelated::getGroup));
if (CollectionUtils.isNotEmpty(brokerIds)) {
List<BrokerGroupRelated> brokerGroupRelateds = repository.findByBrokerIds(brokerIds);
if (CollectionUtils.isNotEmpty(brokerGroupRelateds)) {
return brokerGroupRelateds.stream()
.filter(brokerGroupRelated -> brokerGroupRelated.getGroup() != null)
.collect(Collectors.toMap(BrokerGroupRelated::getId,
BrokerGroupRelated::getGroup));
}
}
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ protected Consumer fillConsumer(Consumer consumer) {
if (application != null) {
app.setId(application.getId());
app.setName(application.getName());
consumer.setOwner(application.getOwner());
}
return consumer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@
<route inherit="get"
path="/v1/producer/weight/:id" handlers="producer#weight render"
errors="error"/>
<route inherit="post"
path="/v1/producer/updateWeight/:id" handlers="producer#updateWeight render"
errors="error"/>

<route inherit="post"
path="/v1/producer/add" handlers="producer#add render"
Expand All @@ -311,6 +314,9 @@
<route inherit="delete"
path="/v1/producer/delete/:id" handlers="producer#delete render"
errors="error"/>
<route inherit="post"
path="/v1/producer/query-by-topic" handlers="producer#query-by-topic render"
errors="error"/>

<!-- producerConfig -->
<route inherit="post"
Expand All @@ -335,6 +341,9 @@
errors="error"/>
<route inherit="delete"
path="/v1/consumer/delete/:id" handlers="consumer#delete render"/>
<route inherit="post"
path="/v1/consumer/queryByTopic" handlers="consumer#queryByTopic render"
errors="error"/>

<route inherit="get"
path="/v1/consumer/findAllSubscribeGroups" handlers="consumer#findAllSubscribeGroups render"/>
Expand Down
Loading

0 comments on commit 723f015

Please sign in to comment.