Skip to content

Commit

Permalink
将生产者权重更新单独做成接口避免覆盖其他配置
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Oct 13, 2020
1 parent e5dd2fb commit c019432
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.jd.laf.web.vertx.response.Response;
import com.jd.laf.web.vertx.response.Responses;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.handler.Constants;
import org.joyqueue.handler.annotation.PageQuery;
Expand Down Expand Up @@ -165,86 +164,12 @@ public Response queryByTopic(@Body QConsumer qConsumer) throws Exception {
public Response configAddOrUpdate(@Body ConsumerConfig config) throws Exception {
if (config != null) {
Consumer consumer = service.findById(config.getConsumerId());
mergeConsumerConfig(consumer, config);
consumer.setConfig(config);
service.update(consumer);
}
return Responses.success();
}

private void mergeConsumerConfig(Consumer consumer, ConsumerConfig config) {
if (consumer.getConfig() == null) {
consumer.setConfig(config);
return;
}
if (config.getAckTimeout() !=null) {
consumer.getConfig().setAckTimeout(config.getAckTimeout());
}
if (config.getBackOffMultiplier() != null) {
consumer.getConfig().setBackOffMultiplier(config.getBackOffMultiplier());
}
if (config.getBatchSize() != null) {
consumer.getConfig().setBatchSize(config.getBatchSize());
}
if (config.getConcurrent() != null) {
consumer.getConfig().setConcurrent(config.getConcurrent());
}
if (config.getDelay() != null) {
consumer.getConfig().setDelay(config.getDelay());
}
if (config.getRetryDelay() != null) {
consumer.getConfig().setRetryDelay(config.getRetryDelay());
}
if (config.getMaxRetrys() != null) {
consumer.getConfig().setMaxRetrys(config.getMaxRetrys());
}
if (config.getMaxRetryDelay() != null) {
consumer.getConfig().setMaxRetryDelay(config.getMaxRetryDelay());
}
if (config.getLimitTps() != null) {
consumer.getConfig().setLimitTps(config.getLimitTps());
}
if (config.getLimitTraffic() != null) {
consumer.getConfig().setLimitTraffic(config.getLimitTraffic());
}
if (config.getRegion() != null) {
consumer.getConfig().setRegion(config.getRegion());
}
if (config.getBlackList() != null) {
consumer.getConfig().setBlackList(config.getBlackList());
}
if (config.getFilters() != null) {
consumer.getConfig().setFilters(config.getFilters());
}
if (config.isArchive() != null) {
consumer.getConfig().setArchive(config.isArchive());
}
if (config.isNearBy() != null) {
consumer.getConfig().setNearBy(config.isNearBy());
}
if (config.isPaused() != null) {
consumer.getConfig().setPaused(config.isPaused());
}
if (config.isRetry() != null) {
consumer.getConfig().setRetry(config.isRetry());
}
if (config.isUseExponentialBackOff() != null) {
consumer.getConfig().setUseExponentialBackOff(config.isUseExponentialBackOff());
}
if (MapUtils.isNotEmpty(config.getParams())) {
if (consumer.getConfig().getParams() == null) {
consumer.getConfig().setParams(config.getParams());
}else {
consumer.getConfig().getParams().putAll(config.getParams());
}
}
if (config.getOffsetMode() != null) {
consumer.getConfig().setOffsetMode(config.getOffsetMode());
}
if (config.getExpireTime() != null) {
consumer.getConfig().setExpireTime(config.getExpireTime());
}
}

/**
* 同步producer
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,19 @@ public Response findPartitionGroupWeight(@QueryParam(Constants.ID) String id) th
return Responses.success(currentWeights);
}

@Path("updateWeight")
public Response updateWeight(@QueryParam(Constants.ID) String id, @Body Map<String, Object> body) throws Exception {
Producer producer = service.findById(id);
if (body.containsKey("weight")) {
if (producer.getConfig() == null) {
producer.setConfig(new ProducerConfig());
}
producer.getConfig().setWeight(body.get("weight").toString());
}
service.update(producer);
return Responses.success();
}

/**
* 同步producer
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package org.joyqueue.handler.routing.command.monitor;


import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.handler.annotation.Operator;
import org.joyqueue.model.domain.Identity;
import org.joyqueue.model.domain.Producer;
Expand Down Expand Up @@ -58,57 +55,12 @@ public String type() {
public Response execute() throws Exception {
Preconditions.checkArgument(null!=producerConfig, "invalid argument");
Producer producer = producerService.findById(producerConfig.getProducerId());
mergeProducerConfig(producer, producerConfig);
producer.setConfig(producerConfig);
return Responses.success(producerService.update(producer));
}

@Override
public void clean() {
producerConfig = null;
}

private void mergeProducerConfig(Producer producer, ProducerConfig config) {
if (producer.getConfig() == null) {
producer.setConfig(config);
return;
}
if (config.getBlackList() != null) {
producer.getConfig().setBlackList(config.getBlackList());
}
if (config.getRegion() != null) {
producer.getConfig().setRegion(config.getRegion());
}
if (StringUtils.isNotBlank(config.getWeight())) {
producer.getConfig().setWeight(config.getWeight());
}
if (config.isArchive() != null) {
producer.getConfig().setArchive(config.isArchive());
}
if (config.isNearBy() != null) {
producer.getConfig().setNearBy(config.isNearBy());
}
if (config.isSingle() != null) {
producer.getConfig().setSingle(config.isSingle());
}
if (config.getQosLevel() != null) {
producer.getConfig().setQosLevel(config.getQosLevel());
}
if (config.getLimitTps() != null) {
producer.getConfig().setLimitTps(config.getLimitTps());
}
if (config.getLimitTraffic() !=null) {
producer.getConfig().setLimitTraffic(config.getLimitTraffic());
}
if (MapUtils.isNotEmpty(config.getParams())) {
if (producer.getConfig().getParams() != null) {
producer.getConfig().getParams().putAll(config.getParams());
}else {
producer.getConfig().setParams(config.getParams());
}
}
if (config.getTimeout() !=null) {
producer.getConfig().setTimeout(config.getTimeout());
}

}
}

0 comments on commit c019432

Please sign in to comment.