Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mostly modification to satisfy more complexing support scenrio such as service Level model #3264

Merged
merged 15 commits into from Feb 19, 2019
Merged
Expand Up @@ -55,6 +55,16 @@ public Invoker<?> getInvoker() {
return null;
}

@Override
public Object put(Object key, Object value) {
return null;
}

@Override
public Object get(Object key) {
return null;
}

public String getAttachment(String key) {
return getAttachments().get(key);
}
Expand Down
Expand Up @@ -72,6 +72,16 @@ public Invoker<?> getInvoker() {
return new Invoker.CompatibleInvoker(delegate.getInvoker());
}

@Override
public Object put(Object key, Object value) {
return delegate.put(key, value);
}

@Override
public Object get(Object key) {
return delegate.get(key);
}

@Override
public org.apache.dubbo.rpc.Invocation getOriginal() {
return delegate;
Expand Down
Expand Up @@ -83,5 +83,15 @@ public String getAttachment(String key, String defaultValue) {
public Invoker<?> getInvoker() {
return null;
}

@Override
public Object put(Object key, Object value) {
return null;
}

@Override
public Object get(Object key) {
return null;
}
}
}
Expand Up @@ -61,6 +61,16 @@ public Invoker<?> getInvoker() {
return null;
}

@Override
public Object put(Object key, Object value) {
return null;
}

@Override
public Object get(Object key) {
return null;
}

public String getAttachment(String key) {
return getAttachments().get(key);
}
Expand Down
Expand Up @@ -73,7 +73,7 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
*
* <li>when the url is dubbo://224.5.6.7:1234/org.apache.dubbo.config.api.DemoService?application=dubbo-sample, then
* the protocol is <b>DubboProtocol</b></li>
*
* <p>
* Actually,when the {@link ExtensionLoader} init the {@link Protocol} instants,it will automatically wraps two
* layers, and eventually will get a <b>ProtocolFilterWrapper</b> or <b>ProtocolListenerWrapper</b>
*/
Expand Down Expand Up @@ -300,7 +300,7 @@ private void init() {

ref = createProxy(map);

ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, interfaceClass.getMethods(), attributes);
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

Expand Down Expand Up @@ -417,14 +417,14 @@ private void createConsumerIfAbsent() {
return;
}
setConsumer(
ConfigManager.getInstance()
.getDefaultConsumer()
.orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
})
);
ConfigManager.getInstance()
.getDefaultConsumer()
.orElseGet(() -> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.refresh();
return consumerConfig;
})
);
}

private void completeCompoundConfigs() {
Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;

import java.util.List;

@SPI
public interface AddressListener {

/**
* processing when receiving the address list
*
* @param addresses
*/
void notify(List<URL> addresses);

}
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.rpc.Invocation;
Expand Down Expand Up @@ -182,6 +183,16 @@ public synchronized void notify(List<URL> urls) {
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.toList());

/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
Set<String> surpportedListeners = addressListenerExtensionLoader.getSupportedExtensions();
for (String addressListenerName : surpportedListeners) {
addressListenerExtensionLoader.getExtension(addressListenerName).notify(categoryUrls);
}


/**
* TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
*/
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class CodecSupport {
private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();

static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
Expand All @@ -51,6 +52,7 @@ public class CodecSupport {
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
ID_SERIALIZATIONNAME_MAP.put(idByte, name);
SERIALIZATIONNAME_ID_MAP.put(name, idByte);
}
}

Expand All @@ -61,6 +63,10 @@ public static Serialization getSerializationById(Byte id) {
return ID_SERIALIZATION_MAP.get(id);
}

public static byte getIDByName(String name) {
return SERIALIZATIONNAME_ID_MAP.get(name);
}

public static Serialization getSerialization(URL url) {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
Expand Down
Expand Up @@ -83,4 +83,9 @@ public interface Invocation {
*/
Invoker<?> getInvoker();

Object put(Object key, Object value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we these interface level method documentation? what do you say?


Object get(Object key);


}
Expand Up @@ -42,6 +42,8 @@ public class RpcInvocation implements Invocation, Serializable {

private Map<String, String> attachments;

private Map<Object, Object> attributes = new HashMap<Object, Object>();

private transient Invoker<?> invoker;

public RpcInvocation() {
Expand Down Expand Up @@ -113,6 +115,14 @@ public void setInvoker(Invoker<?> invoker) {
this.invoker = invoker;
}

public Object put(Object key, Object value) {
return attributes.put(key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider any thread safety here?
should we not prevent addition of adding NULL as key?

}

public Object get(Object key) {
return attributes.get(key);
}

@Override
public String getMethodName() {
return methodName;
Expand Down
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.model;

import org.apache.dubbo.common.extension.SPI;

@SPI
public interface ApplicationInitListener {
/**
* init the application
*/
void init();
}
Expand Up @@ -16,20 +16,23 @@
*/
package org.apache.dubbo.rpc.model;

import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Represent a application which is using Dubbo and store basic metadata info for using
* during the processing of RPC invoking.
*
* <p>
* ApplicationModel includes many ProviderModel which is about published services
* and many Consumer Model which is about subscribed services.
*
* <p>
* adjust project structure in order to fully utilize the methods introduced here.
*/
public class ApplicationModel {
Expand All @@ -47,6 +50,8 @@ public class ApplicationModel {

private static String application;

private static AtomicBoolean INIT_FLAG = new AtomicBoolean(false);

public static Collection<ConsumerModel> allConsumerModels() {
return consumedServices.values();
}
Expand All @@ -63,6 +68,16 @@ public static ConsumerModel getConsumerModel(String serviceName) {
return consumedServices.get(serviceName);
}

public static void init() {
if (INIT_FLAG.compareAndSet(false, true)) {
ExtensionLoader<ApplicationInitListener> extensionLoader = ExtensionLoader.getExtensionLoader(ApplicationInitListener.class);
Set<String> listenerNames = extensionLoader.getSupportedExtensions();
for (String listenerName : listenerNames) {
extensionLoader.getExtension(listenerName).init();
}
}
}

public static void initConsumerModel(String serviceName, ConsumerModel consumerModel) {
if (consumedServices.putIfAbsent(serviceName, consumerModel) != null) {
LOGGER.warn("Already register the same consumer:" + serviceName);
Expand Down
Expand Up @@ -20,6 +20,8 @@

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConsumerMethodModel {
private final Method method;
Expand All @@ -31,8 +33,9 @@ public class ConsumerMethodModel {
private final String methodName;
private final boolean generic;

private final AsyncMethodInfo asyncInfo;
private final ConcurrentMap<String, Object> attributeMap = new ConcurrentHashMap<>();

private final AsyncMethodInfo asyncInfo;

public ConsumerMethodModel(Method method, Map<String, Object> attributes) {
this.method = method;
Expand All @@ -53,6 +56,10 @@ public Method getMethod() {
return method;
}

public ConcurrentMap<String, Object> getAttributeMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning the same copy of attributeMap may lead to programatic mistake as many different java file may modify it, we may not have the track of who is modifying what what modifying.

Should we not have a put and get method of attribute map it self in this class. What do you say?

return attributeMap;
}

public Class<?> getReturnClass() {
return returnClass;
}
Expand Down