Skip to content

Commit

Permalink
Fix notice and opt registry source (#913)
Browse files Browse the repository at this point in the history
* update year

* fix search

* add relation support

* disable swagger
  • Loading branch information
AlbumenJ committed Jul 13, 2022
1 parent ebfa2f7 commit 3a60910
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 23 deletions.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Dubbo Admin
Copyright 2018-2021 The Apache Software Foundation
Copyright 2018-2022 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
4 changes: 2 additions & 2 deletions dubbo-admin-distribution/src/NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Dubbo Admin
Copyright 2018-2021 The Apache Software Foundation
Copyright 2018-2022 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down Expand Up @@ -100,7 +100,7 @@ dubbo NOTICE

========================================================================
Apache Dubbo
Copyright 2018-2021 The Apache Software Foundation
Copyright 2018-2022 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static Provider url2Provider(Pair<String, URL> pair) {
p.setService(service);
p.setAddress(url.getAddress());
p.setApplication(url.getParameter(Constants.APPLICATION_KEY));
p.setUrl(url.toIdentityString());
p.setUrl(url.toFullString());
p.setParameters(url.toParameterString());

p.setDynamic(url.getParameter("dynamic", true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

@Configuration
@EnableSwagger2
@ConditionalOnProperty(name = "swagger.enable", havingValue = "true", matchIfMissing = true)
@ConditionalOnProperty(name = "swagger.enable", havingValue = "true")
public class SwaggerConfiguration {
@Bean
public Docket createRestApi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
* Entity
*
*/
public abstract class Entity implements Serializable {

Expand Down Expand Up @@ -128,4 +128,20 @@ public void setMiss(boolean miss) {
this.miss = miss;
}

@java.lang.Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Entity entity = (Entity) o;
return miss == entity.miss && Objects.equals(ids, entity.ids) && Objects.equals(id, entity.id) && Objects.equals(hash, entity.hash) && Objects.equals(created, entity.created) && Objects.equals(modified, entity.modified) && Objects.equals(now, entity.now) && Objects.equals(operator, entity.operator) && Objects.equals(operatorAddress, entity.operatorAddress);
}

@java.lang.Override
public int hashCode() {
return Objects.hash(ids, id, hash, created, modified, now, operator, operatorAddress, miss);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Provider
*
*/
public class Provider extends Entity {

Expand Down Expand Up @@ -221,4 +221,33 @@ public URL toUrl() {
return url;
}

public boolean equalsWithoutRegistry(Provider provider) {
if (this == provider) {
return true;
}
if (!super.equals(provider)) {
return false;
}
return dynamic == provider.dynamic && enabled == provider.enabled && weight == provider.weight && alived == provider.alived && Objects.equals(service, provider.service) && Objects.equals(parameters, provider.parameters) && Objects.equals(address, provider.address) && Objects.equals(registry, provider.registry) && Objects.equals(application, provider.application) && Objects.equals(username, provider.username) && Objects.equals(expired, provider.expired) && Objects.equals(override, provider.override) && Objects.equals(overrides, provider.overrides);
}

@java.lang.Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
Provider provider = (Provider) o;
return dynamic == provider.dynamic && enabled == provider.enabled && weight == provider.weight && alived == provider.alived && Objects.equals(service, provider.service) && Objects.equals(url, provider.url) && Objects.equals(parameters, provider.parameters) && Objects.equals(address, provider.address) && Objects.equals(registry, provider.registry) && Objects.equals(application, provider.application) && Objects.equals(username, provider.username) && Objects.equals(expired, provider.expired) && Objects.equals(override, provider.override) && Objects.equals(overrides, provider.overrides) && registrySource == provider.registrySource;
}

@java.lang.Override
public int hashCode() {
return Objects.hash(super.hashCode(), service, url, parameters, address, registry, dynamic, enabled, weight, application, username, expired, alived, override, overrides, registrySource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

public enum RegistrySource {

ALL,

INTERFACE,

INSTANCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void notifyAddressChanged(String protocolServiceKey, List<URL> urls) {
List<InstanceAddressURL> instanceAddressUrls = urls.stream().map(url -> (InstanceAddressURL) url).collect(Collectors.toList());
serviceMap.put(serviceKey, instanceAddressUrls);
}
instanceRegistryCache.refreshConsumer(false);
}

private String removeProtocol(String protocolServiceKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ public String getConsumerMetaData(MetadataIdentifier key) {

private String getMetaData(MetadataIdentifier identifier) {
try {
return configService.getConfig(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY),
group, 1000 * 10);
String fromDubboGroup = configService.getConfig(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY),
"dubbo", 1000 * 10);
return org.apache.dubbo.common.utils.StringUtils.isNotEmpty(fromDubboGroup) ? fromDubboGroup :
configService.getConfig(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY),
group, 1000 * 10);
} catch (NacosException e) {
logger.warn("Failed to get " + identifier + " from nacos, cause: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
Expand All @@ -33,15 +35,22 @@
@Component
public class ConsumerServiceImpl extends AbstractService implements ConsumerService {

@Autowired
private InstanceRegistryQueryHelper instanceRegistryQueryHelper;

@Override
public List<Consumer> findByService(String service) {
return SyncUtils.url2ConsumerList(findConsumerUrlByService(service));
List<Consumer> consumers = SyncUtils.url2ConsumerList(findConsumerUrlByService(service));
consumers.addAll(instanceRegistryQueryHelper.findConsumerByService(service));
return consumers;
}


@Override
public List<Consumer> findAll() {
return SyncUtils.url2ConsumerList(findAllConsumerUrl());
List<Consumer> consumers = SyncUtils.url2ConsumerList(findAllConsumerUrl());
consumers.addAll(instanceRegistryQueryHelper.findAllConsumer());
return consumers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package org.apache.dubbo.admin.service.impl;

import org.apache.dubbo.admin.common.util.Constants;
import org.apache.dubbo.admin.service.RegistryCache;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.registry.client.InstanceAddressURL;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.rpc.service.Destroyable;

import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* instance registry url {@link InstanceAddressURL} cache
Expand All @@ -37,6 +50,10 @@ public class InstanceRegistryCache implements RegistryCache<String, ConcurrentMa

private final ConcurrentMap<String, ConcurrentMap<String, Map<String, List<InstanceAddressURL>>>> registryCache = new ConcurrentHashMap<>();

private final Map<String, Map<String, List<URL>>> subscribedCache = new ConcurrentHashMap<>();

private final AtomicBoolean startRefresh = new AtomicBoolean(false);

@Override
public void put(String key, ConcurrentMap<String, Map<String, List<InstanceAddressURL>>> value) {
registryCache.put(key, value);
Expand All @@ -52,4 +69,44 @@ public ConcurrentMap<String, Map<String, List<InstanceAddressURL>>> computeIfAbs
Function<? super String, ? extends ConcurrentMap<String, Map<String, List<InstanceAddressURL>>>> mappingFunction) {
return registryCache.computeIfAbsent(key, mappingFunction);
}

public Map<String, Map<String, List<URL>>> getSubscribedCache() {
return subscribedCache;
}

public synchronized void refreshConsumer(boolean refreshAll) {
if (startRefresh.compareAndSet(false, true)) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Consumer-Refresh"));
executorService.scheduleAtFixedRate(() -> refreshConsumer(true), 60, 60, TimeUnit.MINUTES);
}

Map<String, Map<String, List<URL>>> origin;

if (refreshAll) {
origin = new ConcurrentHashMap<>();
} else {
origin = subscribedCache;
}

Map<String, List<InstanceAddressURL>> providers = get(Constants.PROVIDERS_CATEGORY).values().stream()
.flatMap((e) -> e.values().stream())
.flatMap(Collection::stream)
.collect(Collectors.groupingBy(InstanceAddressURL::getAddress));

// remove cached
origin.keySet().forEach(providers::remove);

for (List<InstanceAddressURL> instanceAddressURLs : providers.values()) {
MetadataService metadataService = MetadataUtils.referProxy(instanceAddressURLs.get(0).getInstance());
try {
Set<String> subscribedURLs = metadataService.getSubscribedURLs();

Map<String, List<URL>> subscribed = subscribedURLs.stream().map(URL::valueOf).collect(Collectors.groupingBy(URL::getServiceKey));
origin.put(instanceAddressURLs.get(0).getAddress(), subscribed);
} catch (Throwable ignored) {

}
((Destroyable) metadataService).$destroy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
package org.apache.dubbo.admin.service.impl;

import org.apache.dubbo.admin.common.util.Constants;
import org.apache.dubbo.admin.common.util.Pair;
import org.apache.dubbo.admin.common.util.SyncUtils;
import org.apache.dubbo.admin.model.domain.Consumer;
import org.apache.dubbo.admin.model.domain.Provider;
import org.apache.dubbo.admin.model.domain.RegistrySource;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.InstanceAddressURL;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.RpcContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -85,6 +92,23 @@ public List<Provider> findByService(String serviceName) {
.collect(Collectors.toList());
}

public List<Consumer> findAllConsumer() {
return instanceRegistryCache.getSubscribedCache().values().stream()
.flatMap(m -> m.values().stream())
.flatMap(Collection::stream)
.map(m -> new Pair<>(m.toFullString(), m))
.map(SyncUtils::url2Consumer)
.collect(Collectors.toList());
}

public List<Consumer> findConsumerByService(String serviceName) {
return instanceRegistryCache.getSubscribedCache().values().stream().filter(m -> m.containsKey(serviceName))
.flatMap(m -> m.get(serviceName).stream())
.map(m -> new Pair<>(m.toFullString(), m))
.map(SyncUtils::url2Consumer)
.collect(Collectors.toList());
}

public List<Provider> findByAddress(String providerAddress) {
ConcurrentMap<String, Map<String, List<InstanceAddressURL>>> appInterfaceMap = instanceRegistryCache.get(Constants.PROVIDERS_CATEGORY);
if (appInterfaceMap == null) {
Expand Down Expand Up @@ -136,18 +160,30 @@ private List<Provider> urlsToProviderList(List<InstanceAddressURL> urls) {
MetadataInfo metadataInfo = url.getMetadataInfo();

metadataInfo.getServices().forEach((serviceKey, serviceInfo) -> {
// build consumer url

ServiceConfigURL consumerUrl = new URLBuilder()
.setProtocol(serviceInfo.getProtocol())
.setPath(serviceInfo.getPath())
.addParameter("group", serviceInfo.getGroup())
.addParameter("version", serviceInfo.getVersion())
.build();
RpcContext.getServiceContext().setConsumerUrl(consumerUrl);

Provider p = new Provider();
String service = serviceInfo.getServiceKey();
p.setService(service);
p.setAddress(url.getAddress());
p.setApplication(instance.getServiceName());
p.setUrl(url.toParameterString());
p.setUrl(url.toFullString());
p.setDynamic(url.getParameter("dynamic", true));
p.setEnabled(url.getParameter(Constants.ENABLED_KEY, true));
p.setWeight(url.getParameter(Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT));
p.setUsername(url.getParameter("owner"));
p.setRegistrySource(RegistrySource.INSTANCE);
providers.add(p);

RpcContext.getServiceContext().setConsumerUrl(null);
});
});
return providers;
Expand Down
Loading

0 comments on commit 3a60910

Please sign in to comment.