Skip to content

Commit

Permalink
Revert "Local references support mergeable (#9645)" (#10707)
Browse files Browse the repository at this point in the history
This reverts commit 72326f4.
  • Loading branch information
AlbumenJ committed Oct 9, 2022
1 parent c374c0e commit 15f4a7d
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 181 deletions.
Expand Up @@ -448,7 +448,11 @@ private void createInvokerForLocal(Map<String, String> referenceParameters) {
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName(), referenceParameters);
url = url.setScopeModel(getScopeModel());
url = url.setServiceModel(consumerModel);
invoker = protocolSPI.refer(interfaceClass, url);
Invoker<?> withFilter = protocolSPI.refer(interfaceClass, url);
// Local Invoke ( Support Cluster Filter / Filter )
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(withFilter);
invoker = Cluster.getCluster(url.getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(url, invokers), true);

if (logger.isInfoEnabled()) {
logger.info("Using in jvm service " + interfaceClass.getName());
Expand Down
Expand Up @@ -477,9 +477,10 @@ public void testCreateInvokerForLocalRefer() {
.initialize();

referenceConfig.init();
Invoker<?> withFilter = ((ListenerInvokerWrapper<?>) referenceConfig.getInvoker()).getInvoker();
withFilter = ((MockClusterInvoker<?>) withFilter).getDirectory().getAllInvokers().get(0);
Assertions.assertTrue(withFilter instanceof InjvmInvoker);
Assertions.assertTrue(referenceConfig.getInvoker() instanceof MockClusterInvoker);
Invoker<?> withFilter = ((MockClusterInvoker<?>) referenceConfig.getInvoker()).getDirectory().getAllInvokers().get(0);
Assertions.assertTrue(withFilter instanceof ListenerInvokerWrapper);
Assertions.assertTrue(((ListenerInvokerWrapper<?>) withFilter).getInvoker() instanceof InjvmInvoker);
URL url = withFilter.getUrl();
Assertions.assertEquals("application1", url.getParameter("application"));
Assertions.assertEquals("value1", url.getParameter("key1"));
Expand Down
Expand Up @@ -253,7 +253,7 @@ public List<String> getEffectReferenceRegistryURLs() {

protected static class MultipleNotifyListenerWrapper implements NotifyListener {

Map<URL, SingleNotifyListener> registryMap = new ConcurrentHashMap<>(4);
Map<URL, SingleNotifyListener> registryMap = new ConcurrentHashMap<URL, SingleNotifyListener>(4);
NotifyListener sourceNotifyListener;

public MultipleNotifyListenerWrapper(NotifyListener sourceNotifyListener) {
Expand Down
Expand Up @@ -42,6 +42,7 @@

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -62,18 +63,18 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {

private final String key;

private final Exporter<?> exporter;
private final Map<String, Exporter<?>> exporterMap;

private final ExecutorRepository executorRepository;

private final ParamDeepCopyUtil paramDeepCopyUtil;

private final boolean shouldIgnoreSameModule;

InjvmInvoker(Class<T> type, URL url, String key, Exporter<?> exporter) {
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporter = exporter;
this.exporterMap = exporterMap;
this.executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
this.paramDeepCopyUtil = url.getOrDefaultFrameworkModel().getExtensionLoader(ParamDeepCopyUtil.class)
.getExtension(url.getParameter(CommonConstants.INJVM_COPY_UTIL_KEY, DefaultParamDeepCopyUtil.NAME));
Expand All @@ -82,6 +83,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {

@Override
public boolean isAvailable() {
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
if (exporter == null) {
return false;
} else {
Expand All @@ -91,6 +93,7 @@ public boolean isAvailable() {

@Override
public Result doInvoke(Invocation invocation) throws Throwable {
Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
Expand Down
Expand Up @@ -18,29 +18,19 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.support.MergeableCluster;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.support.ProtocolUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER;
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
Expand Down Expand Up @@ -79,7 +69,7 @@ static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
if (result == null) {
return null;
} else if (ProtocolUtils.isGeneric(
result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
return null;
} else {
return result;
Expand All @@ -98,15 +88,7 @@ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
// group="a,b" or group="*"
String group = url.getParameter(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doCreateInvoker(url, Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), serviceType);
}
}
Cluster cluster = Cluster.getCluster(url.getScopeModel(), url.getParameter(CLUSTER_KEY));
return doCreateInvoker(url, cluster, serviceType);
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}

public boolean isInjvmRefer(URL url) {
Expand Down Expand Up @@ -134,34 +116,4 @@ public boolean isInjvmRefer(URL url) {
return false;
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
protected <T> ClusterInvoker<T> doCreateInvoker(URL url, Cluster cluster, Class<T> type) {
StaticDirectory directory = new StaticDirectory(url, getInvokers(exporterMap, url, type));
return (ClusterInvoker<T>) cluster.join(directory, true);
}

private <T> List<Invoker<T>> getInvokers(Map<String, Exporter<?>> map, URL url, Class<T> type) {
List<Invoker<T>> result = new ArrayList<>();

if (!url.getServiceKey().contains("*")) {
Exporter<?> exporter = map.get(url.getServiceKey());
InjvmInvoker<T> invoker = new InjvmInvoker<>(type, url, url.getServiceKey(), exporter);
result.add(invoker);
} else {
if (CollectionUtils.isNotEmptyMap(map)) {
for (Exporter<?> exporter : map.values()) {
if (UrlUtils.isServiceKeyMatch(url, exporter.getInvoker().getUrl())) {
URL providerUrl = exporter.getInvoker().getUrl();
URL consumerUrl = url.addParameter(GROUP_KEY, providerUrl.getGroup())
.addParameter(VERSION_KEY, providerUrl.getVersion());
InjvmInvoker<T> invoker = new InjvmInvoker<>(type, consumerUrl, consumerUrl.getServiceKey(), exporter);
result.add(invoker);
}
}
}
}

return result;
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Expand Up @@ -29,16 +29,18 @@
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static org.apache.dubbo.common.constants.CommonConstants.*;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
import static org.apache.dubbo.rpc.Constants.SCOPE_LOCAL;
import static org.apache.dubbo.rpc.Constants.SCOPE_REMOTE;
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -74,7 +76,7 @@ public void testLocalProtocol() throws Exception {
assertEquals(service.getSize(new String[]{"", "", ""}), 3);
service.invoke("injvm://127.0.0.1/TestService", "invoke");

InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, null);
InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap<>());
assertFalse(injvmInvoker.isAvailable());

}
Expand Down Expand Up @@ -135,36 +137,4 @@ public void testLocalProtocolAsync() throws Exception {
assertNull(service.getAsyncResult());
}

@Test
public void testLocalProtocolForMergeResult() throws Exception {
HelloService helloService1 = new Hello1ServiceImpl();
URL url = URL.valueOf("injvm://127.0.0.1/HelloService")
.addParameter(INTERFACE_KEY, HelloService.class.getName())
.addParameter(APPLICATION_KEY, "consumer")
.addParameter(GROUP_KEY, "g1");
Invoker<?> invoker1 = proxy.getInvoker(helloService1, HelloService.class, url);
assertTrue(invoker1.isAvailable());
Exporter<?> exporter1 = protocol.export(invoker1);
exporters.add(exporter1);

URL url2 = URL.valueOf("injvm://127.0.0.1/HelloService")
.addParameter(INTERFACE_KEY, HelloService.class.getName())
.addParameter(APPLICATION_KEY, "consumer")
.addParameter(GROUP_KEY, "g2");
HelloService helloService2 = new Hello2ServiceImpl();
Invoker<?> invoker2 = proxy.getInvoker(helloService2, HelloService.class, url2);
assertTrue(invoker2.isAvailable());
Exporter<?> exporter2 = protocol.export(invoker2);
exporters.add(exporter2);


URL referUrl = URL.valueOf("injvm://127.0.0.1/HelloService")
.addParameter(INTERFACE_KEY, HelloService.class.getName())
.addParameter(APPLICATION_KEY, "consumer")
.addParameter(GROUP_KEY, "*")
.addParameter(MERGER_KEY, "list");
List<String> list = proxy.getProxy(protocol.refer(HelloService.class, referUrl)).hellos();
assertEquals(2, list.size());
}

}

0 comments on commit 15f4a7d

Please sign in to comment.