Skip to content

Commit

Permalink
Enhance to remove double filter service info for push callback. (#8574)
Browse files Browse the repository at this point in the history
* Enhance to remove double filter service info for push callback.

* Enhance to remove double filter service info for push callback.
  • Loading branch information
KomachiSion committed Jun 16, 2022
1 parent a7d8066 commit ed81d3c
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.alibaba.nacos.naming.push.v2.executor;

import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;

/**
* Nacos naming push executor for v2.
Expand All @@ -44,5 +44,5 @@ public interface PushExecutor {
* @param data push data
* @param callBack callback
*/
void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack);
void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, NamingPushCallback callBack);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.alibaba.nacos.naming.push.v2.executor;

import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;
import org.springframework.stereotype.Component;

import java.util.Optional;
Expand Down Expand Up @@ -48,7 +48,8 @@ public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data)
}

@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.remote.request.NotifySubscriberRequest;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import org.springframework.stereotype.Component;

Expand All @@ -42,16 +42,22 @@ public PushExecutorRpcImpl(RpcPushService pushService) {

@Override
public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data) {
pushService.pushWithoutAck(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)));
pushService.pushWithoutAck(clientId,
NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)));
}

@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)),
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}

private ServiceInfo getServiceInfo(PushDataWrapper data, Subscriber subscriber) {
return ServiceUtil.selectInstancesWithHealthyProtection(data.getOriginalData(), data.getServiceMetadata(), false, true, subscriber);
return ServiceUtil
.selectInstancesWithHealthyProtection(data.getOriginalData(), data.getServiceMetadata(), false, true,
subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.UdpPushService;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;
import com.alibaba.nacos.naming.utils.ServiceUtil;
import org.springframework.stereotype.Component;

Expand All @@ -42,12 +42,16 @@ public PushExecutorUdpImpl(UdpPushService pushService) {

@Override
public void doPush(String clientId, Subscriber subscriber, PushDataWrapper data) {
pushService.pushDataWithoutCallback(subscriber, handleClusterData(replaceServiceInfoName(data, subscriber), subscriber));
pushService.pushDataWithoutCallback(subscriber,
handleClusterData(replaceServiceInfoName(data, subscriber), subscriber));
}

@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
pushService.pushDataWithCallback(subscriber, handleClusterData(replaceServiceInfoName(data, subscriber), subscriber), callBack);
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = replaceServiceInfoName(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
pushService.pushDataWithCallback(subscriber, handleClusterData(actualServiceInfo, subscriber), callBack);
}

/**
Expand All @@ -64,8 +68,9 @@ public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataW
* @return new service info for 1.x
*/
private ServiceInfo replaceServiceInfoName(PushDataWrapper originalData, Subscriber subscriber) {
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(originalData.getOriginalData(), originalData.getServiceMetadata(),
false, true, subscriber);
ServiceInfo serviceInfo = ServiceUtil
.selectInstancesWithHealthyProtection(originalData.getOriginalData(), originalData.getServiceMetadata(),
false, true, subscriber);
ServiceInfo result = new ServiceInfo();
result.setName(NamingUtils.getGroupedName(serviceInfo.getName(), serviceInfo.getGroupName()));
result.setClusters(serviceInfo.getClusters());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 1999-2021 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.push.v2.task;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;

/**
* Push callback for Naming.
*
* @author xiweng.yy
*/
public interface NamingPushCallback extends PushCallBack {

/**
* Set actual pushed service info, the host list of service info may be changed by selector. Detail see implement of
* {@link com.alibaba.nacos.naming.push.v2.executor.PushExecutor}.
*
* @param serviceInfo actual pushed service info
*/
void setActualServiceInfo(ServiceInfo serviceInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alibaba.nacos.naming.push.v2.task;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
Expand All @@ -32,7 +31,6 @@
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.hook.PushResult;
import com.alibaba.nacos.naming.push.v2.hook.PushResultHookHolder;
import com.alibaba.nacos.naming.utils.ServiceUtil;

import java.util.Collection;

Expand Down Expand Up @@ -68,7 +66,7 @@ public void run() {
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
Expand All @@ -87,7 +85,7 @@ private Collection<String> getTargetClientIds() {
: delayTask.getTargetClients();
}

private class NamingPushCallback implements PushCallBack {
private class ServicePushCallback implements NamingPushCallback {

private final String clientId;

Expand All @@ -102,13 +100,20 @@ private class NamingPushCallback implements PushCallBack {

private final boolean isPushToAll;

private NamingPushCallback(String clientId, Subscriber subscriber, ServiceInfo serviceInfo,
/**
* The actual pushed service info, the host list of service info may be changed by selector. Detail see
* implement of {@link com.alibaba.nacos.naming.push.v2.executor.PushExecutor}.
*/
private ServiceInfo actualServiceInfo;

private ServicePushCallback(String clientId, Subscriber subscriber, ServiceInfo serviceInfo,
boolean isPushToAll) {
this.clientId = clientId;
this.subscriber = subscriber;
this.serviceInfo = serviceInfo;
this.isPushToAll = isPushToAll;
this.executeStartTime = System.currentTimeMillis();
this.actualServiceInfo = serviceInfo;
}

@Override
Expand All @@ -118,29 +123,30 @@ public long getTimeout() {

@Override
public void onSuccess() {
ServiceInfo serviceInfo = getServiceInfo(service, this.serviceInfo);
long pushFinishTime = System.currentTimeMillis();
long pushCostTimeForNetWork = pushFinishTime - executeStartTime;
long pushCostTimeForAll = pushFinishTime - delayTask.getLastProcessTime();
long serviceLevelAgreementTime = pushFinishTime - service.getLastUpdatedTime();
if (isPushToAll) {
Loggers.PUSH.info("[PUSH-SUCC] {}ms, all delay time {}ms, SLA {}ms, {}, DataSize={}, target={}",
pushCostTimeForNetWork, pushCostTimeForAll, serviceLevelAgreementTime, service,
serviceInfo.getHosts().size(), subscriber.getIp());
Loggers.PUSH
.info("[PUSH-SUCC] {}ms, all delay time {}ms, SLA {}ms, {}, originalSize={}, DataSize={}, target={}",
pushCostTimeForNetWork, pushCostTimeForAll, serviceLevelAgreementTime, service,
serviceInfo.getHosts().size(), actualServiceInfo.getHosts().size(), subscriber.getIp());
} else {
Loggers.PUSH.info("[PUSH-SUCC] {}ms, all delay time {}ms for subscriber {}, {}, DataSize={}",
pushCostTimeForNetWork, pushCostTimeForAll, subscriber.getIp(), service,
serviceInfo.getHosts().size());
Loggers.PUSH
.info("[PUSH-SUCC] {}ms, all delay time {}ms for subscriber {}, {}, originalSize={}, DataSize={}",
pushCostTimeForNetWork, pushCostTimeForAll, subscriber.getIp(), service,
serviceInfo.getHosts().size(), actualServiceInfo.getHosts().size());
}
PushResult result = PushResult.pushSuccess(service, clientId, serviceInfo, subscriber,
pushCostTimeForNetWork, pushCostTimeForAll, serviceLevelAgreementTime, isPushToAll);
PushResult result = PushResult
.pushSuccess(service, clientId, actualServiceInfo, subscriber, pushCostTimeForNetWork,
pushCostTimeForAll, serviceLevelAgreementTime, isPushToAll);
NotifyCenter.publishEvent(getPushServiceTraceEvent(pushFinishTime, result));
PushResultHookHolder.getInstance().pushSuccess(result);
}

@Override
public void onFail(Throwable e) {
ServiceInfo serviceInfo = getServiceInfo(service, this.serviceInfo);
long pushCostTime = System.currentTimeMillis() - executeStartTime;
Loggers.PUSH.error("[PUSH-FAIL] {}ms, {}, reason={}, target={}", pushCostTime, service, e.getMessage(),
subscriber.getIp());
Expand All @@ -149,16 +155,13 @@ public void onFail(Throwable e) {
delayTaskEngine.addTask(service,
new PushDelayTask(service, PushConfig.getInstance().getPushTaskRetryDelay(), clientId));
}
PushResult result = PushResult.pushFailed(service, clientId, serviceInfo, subscriber, pushCostTime, e,
isPushToAll);
PushResult result = PushResult
.pushFailed(service, clientId, actualServiceInfo, subscriber, pushCostTime, e, isPushToAll);
PushResultHookHolder.getInstance().pushFailed(result);
}

private ServiceInfo getServiceInfo(Service service, ServiceInfo serviceInfo) {
ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service)
.orElse(null);
return ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, false, true,
subscriber);
public void setActualServiceInfo(ServiceInfo actualServiceInfo) {
this.actualServiceInfo = actualServiceInfo;
}

private NamingTraceEvent.PushServiceTraceEvent getPushServiceTraceEvent(long eventTime, PushResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testRunHealthyInstanceWithHeartBeat() {
@Test
public void testRunHealthyInstanceWithTimeoutFromInstance() throws InterruptedException {
injectInstance(true, System.currentTimeMillis()).getExtendDatum()
.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, 1000);
.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, 800);
when(globalConfig.isExpireInstance()).thenReturn(true);
TimeUnit.SECONDS.sleep(1);
beatCheckTask.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.mock.env.MockEnvironment;

import java.net.InetSocketAddress;
import java.util.Collection;
Expand All @@ -32,6 +34,7 @@ public class NamingSubscriberServiceV1ImplTest {

@BeforeClass
public static void setUp() throws Exception {
EnvUtil.setEnvironment(new MockEnvironment());
namingSubscriberService = new NamingSubscriberServiceV1Impl();
InetSocketAddress socketAddress = new InetSocketAddress(8848);
PushClient pushClient = new PushClient("1", "G1@@S1", "", "", socketAddress, null, "", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.alibaba.nacos.naming.push.v2.executor;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -48,7 +48,7 @@ public class PushExecutorDelegateTest {
private Subscriber subscriber;

@Mock
private PushCallBack pushCallBack;
private NamingPushCallback pushCallBack;

private PushDataWrapper pushdata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v2.PushDataWrapper;
import com.alibaba.nacos.naming.push.v2.task.NamingPushCallback;
import com.alibaba.nacos.naming.selector.SelectorManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class PushExecutorRpcImplTest {
private Subscriber subscriber;

@Mock
private PushCallBack pushCallBack;
private NamingPushCallback pushCallBack;

@Mock
private SelectorManager selectorManager;
Expand All @@ -76,6 +77,7 @@ public class PushExecutorRpcImplTest {

@Before
public void setUp() throws Exception {
EnvUtil.setEnvironment(new MockEnvironment());
serviceMetadata = new ServiceMetadata();
pushData = new PushDataWrapper(serviceMetadata, new ServiceInfo("G@@S"));
pushExecutor = new PushExecutorRpcImpl(pushService);
Expand All @@ -85,8 +87,8 @@ public void setUp() throws Exception {
eq(GlobalExecutor.getCallbackExecutor()));
ApplicationUtils.injectContext(context);
when(context.getBean(SelectorManager.class)).thenReturn(selectorManager);
when(selectorManager.select(any(), any(), any())).then(
(Answer<List<Instance>>) invocationOnMock -> invocationOnMock.getArgument(2));
when(selectorManager.select(any(), any(), any()))
.then((Answer<List<Instance>>) invocationOnMock -> invocationOnMock.getArgument(2));
}

@Test
Expand Down
Loading

0 comments on commit ed81d3c

Please sign in to comment.