Skip to content

Commit

Permalink
Merge 42ed2cd into e1e9aca
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed May 7, 2023
2 parents e1e9aca + 42ed2cd commit 4c1d702
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.common.EventHttpHandler;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.DefaultEventMeshRecommendStrategyImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendStrategy;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
return;
}

EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
EventMeshRecommendStrategy eventMeshRecommendStrategy = new DefaultEventMeshRecommendStrategyImpl(eventMeshTCPServer);
String recommendEventMeshResult = eventMeshRecommendStrategy.calculateRecommendEventMesh(group, purpose);
result = (recommendEventMeshResult == null) ? "null" : recommendEventMeshResult;
log.info("recommend eventmesh:{},group:{},purpose:{}", result, group, purpose);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.DefaultEventMeshRecommendStrategyImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

Expand Down Expand Up @@ -179,7 +179,7 @@ private void doRedirect(String group, String purpose, int judge, List<String> ev
private List<String> selectRedirectEventMesh(String group, Map<String, String> eventMeshMap,
Map<String, Integer> clientDistributionMap, int judge,
String eventMeshName) throws Exception {
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
EventMeshRecommendStrategy eventMeshRecommendStrategy = new DefaultEventMeshRecommendStrategyImpl(eventMeshTCPServer);

Check warning on line 182 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java#L182

Added line #L182 was not covered by tests
return eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap,
group, judge, eventMeshName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EventMeshRecommendImpl implements EventMeshRecommendStrategy {
public class DefaultEventMeshRecommendStrategyImpl implements EventMeshRecommendStrategy {

private static final int DEFAULT_PROXY_NUM = 1;

private final transient EventMeshTCPServer eventMeshTCPServer;

public EventMeshRecommendImpl(final EventMeshTCPServer eventMeshTCPServer) {
public DefaultEventMeshRecommendStrategyImpl(final EventMeshTCPServer eventMeshTCPServer) {

Check warning on line 44 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L44

Added line #L44 was not covered by tests
this.eventMeshTCPServer = eventMeshTCPServer;
}

Expand Down Expand Up @@ -69,8 +69,8 @@ public String calculateRecommendEventMesh(final String group, final String purpo

if (CollectionUtils.isEmpty(eventMeshDataInfoList)) {
if (log.isWarnEnabled()) {
log.warn("EventMeshRecommend failed,not find eventMesh instances from registry,cluster:{},group:{},purpose:{}",
cluster, group, purpose);
log.warn("EventMeshRecommend failed,not find eventMesh instances from registry,cluster:{},group:{},purpose:{}", cluster, group,

Check warning on line 72 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L72

Added line #L72 was not covered by tests
purpose);
}
return null;
}
Expand Down Expand Up @@ -113,7 +113,7 @@ public List<String> calculateRedirectRecommendEventMesh(final Map<String, String
Objects.requireNonNull(clientDistributedMap, "clientDistributedMap can not be null");

if (recommendProxyNum < DEFAULT_PROXY_NUM || MapUtils.isEmpty(clientDistributedMap)) {
return new ArrayList<String>();
return new ArrayList<>(0);

Check warning on line 116 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L116

Added line #L116 was not covered by tests
}

if (log.isInfoEnabled()) {
Expand Down Expand Up @@ -145,29 +145,27 @@ public List<String> calculateRedirectRecommendEventMesh(final Map<String, String
}

if (log.isInfoEnabled()) {
log.info("choose proxys with min instance num, group:{}, recommendProxyNum:{}, recommendProxyList:{}",
log.info("choose proxies with min instance num, group:{}, recommendProxyNum:{}, recommendProxyList:{}",

Check warning on line 148 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L148

Added line #L148 was not covered by tests
group, recommendProxyNum, recommendProxyList);
}
return recommendProxyList;
}

private String recommendProxyByDistributeData(final String cluster, final String group, final String purpose,
final Map<String, String> eventMeshMap, final boolean caculateLocal) {
final Map<String, String> eventMeshMap, final boolean calculateLocal) {
Objects.requireNonNull(eventMeshMap, "eventMeshMap can not be null");

if (log.isInfoEnabled()) {
log.info("eventMeshMap:{},cluster:{},group:{},purpose:{},caculateLocal:{}", eventMeshMap, cluster,
group, purpose, caculateLocal);
log.info("eventMeshMap:{},cluster:{},group:{},purpose:{},calculateLocal:{}", eventMeshMap, cluster, group, purpose, calculateLocal);

Check warning on line 159 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L159

Added line #L159 was not covered by tests
}

Map<String, Map<String, Integer>> eventMeshClientDistributionDataMap = null;
try {
eventMeshClientDistributionDataMap = eventMeshTCPServer.getRegistry().findEventMeshClientDistributionData(
cluster, group, purpose);
eventMeshClientDistributionDataMap = eventMeshTCPServer.getRegistry().findEventMeshClientDistributionData(cluster, group, purpose);

Check warning on line 164 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L164

Added line #L164 was not covered by tests
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("EventMeshRecommend failed,findEventMeshClientDistributionData failed,"
+ "cluster:{},group:{},purpose:{}, errMsg:{}", cluster, group, purpose, e);
log.warn("EventMeshRecommend failed,findEventMeshClientDistributionData failed,cluster:{},group:{},purpose:{}, errMsg:{}", cluster,

Check warning on line 167 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L167

Added line #L167 was not covered by tests
group, purpose, e);
}
}

Expand All @@ -181,8 +179,8 @@ private String recommendProxyByDistributeData(final String cluster, final String
Collections.shuffle(tmpProxyAddrList);
recommendProxyAddr = tmpProxyAddrList.get(0);
if (log.isInfoEnabled()) {
log.info("No distribute data in registry,cluster:{}, group:{},purpose:{}, recommendProxyAddr:{}",
cluster, group, purpose, recommendProxyAddr);
log.info("No distribute data in registry,cluster:{}, group:{},purpose:{}, recommendProxyAddr:{}", cluster, group, purpose,

Check warning on line 182 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L182

Added line #L182 was not covered by tests
recommendProxyAddr);
}
return recommendProxyAddr;
}
Expand All @@ -205,20 +203,18 @@ private String recommendProxyByDistributeData(final String cluster, final String
}
});

recommendProxyAddr = recommendProxy(eventMeshMap, (caculateLocal == true) ? localClientDistributionMap
: remoteClientDistributionMap, group);
recommendProxyAddr = recommendProxy(eventMeshMap, (calculateLocal == true) ? localClientDistributionMap : remoteClientDistributionMap, group);

if (log.isInfoEnabled()) {
log.info("eventMeshMap:{},group:{},purpose:{},caculateLocal:{},recommendProxyAddr:{}", eventMeshMap,
group, purpose, caculateLocal, recommendProxyAddr);
log.info("eventMeshMap:{},group:{},purpose:{},caculateLocal:{},recommendProxyAddr:{}", eventMeshMap, group, purpose, calculateLocal,

Check warning on line 209 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/DefaultEventMeshRecommendStrategyImpl.java#L209

Added line #L209 was not covered by tests
recommendProxyAddr);
}

return recommendProxyAddr;
}

private String recommendProxy(final Map<String, String> eventMeshMap,
final Map<String, Integer> clientDistributionMap,
final String group) {
private String recommendProxy(final Map<String, String> eventMeshMap, final Map<String, Integer> clientDistributionMap, final String group) {

Objects.requireNonNull(eventMeshMap, "eventMeshMap can not be null");
Objects.requireNonNull(clientDistributionMap, "clientDistributionMap can not be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.DefaultEventMeshRecommendStrategyImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendStrategy;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void run() {
UserAgent user = (UserAgent) pkg.getBody();
validateUserAgent(user);
String group = getGroupOfClient(user);
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
EventMeshRecommendStrategy eventMeshRecommendStrategy = new DefaultEventMeshRecommendStrategyImpl(eventMeshTCPServer);

Check warning on line 58 in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/RecommendTask.java#L58

Added line #L58 was not covered by tests
String eventMeshRecommendResult = eventMeshRecommendStrategy.calculateRecommendEventMesh(group, user.getPurpose());
res.setHeader(new Header(RECOMMEND_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(),
pkg.getHeader().getSeq()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.eventmesh.runtime.admin.controller.HttpHandlerManager;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.DefaultEventMeshRecommendStrategyImpl;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -68,7 +68,7 @@ public void testHandle() throws Exception {
// case 1: normal case
tcpConfiguration.setEventMeshServerRegistryEnable(true);
when(httpExchange.getResponseBody()).thenReturn(outputStream);
try (MockedConstruction<EventMeshRecommendImpl> ignored = mockConstruction(EventMeshRecommendImpl.class,
try (MockedConstruction<DefaultEventMeshRecommendStrategyImpl> ignored = mockConstruction(DefaultEventMeshRecommendStrategyImpl.class,
(mock, context) -> when(mock.calculateRecommendEventMesh(anyString(), anyString())).thenReturn(returnValue))) {
handler.handle(httpExchange);
String response = outputStream.toString();
Expand All @@ -90,11 +90,11 @@ public void testHandle() throws Exception {
outputStream = mock(ByteArrayOutputStream.class);
doThrow(new IOException()).when(outputStream).close();
when(httpExchange.getResponseBody()).thenReturn(outputStream);
try (MockedConstruction<EventMeshRecommendImpl> ignored = mockConstruction(EventMeshRecommendImpl.class,
try (MockedConstruction<DefaultEventMeshRecommendStrategyImpl> ignored = mockConstruction(DefaultEventMeshRecommendStrategyImpl.class,
(mock, context) -> when(mock.calculateRecommendEventMesh(anyString(), anyString())).thenReturn(returnValue))) {
handler.handle(httpExchange);
String response = outputStream.toString();
Assert.assertNotEquals(returnValue, response);
}
}
}
}

0 comments on commit 4c1d702

Please sign in to comment.