Skip to content

Commit

Permalink
Merge pull request #1843, support implicit delivery of attachments fr…
Browse files Browse the repository at this point in the history
…om provider to consumer.

Fixes #889, #1466, #1834, #1466, #1524
  • Loading branch information
chickenlj committed May 31, 2018
1 parent 5f5fecd commit e506367
Show file tree
Hide file tree
Showing 38 changed files with 353 additions and 169 deletions.
Expand Up @@ -101,10 +101,10 @@ private String toKey(Object[] args) {

private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}

private long hash(byte[] digest, int number) {
Expand Down
Expand Up @@ -156,7 +156,7 @@ public class Constants {

public static final String LOADBALANCE_KEY = "loadbalance";

// key for router type, for e.g., "script"/"file", corresponding to ScriptRouterFactory.NAME, FileRouterFactory.NAME
// key for router type, for e.g., "script"/"file", corresponding to ScriptRouterFactory.NAME, FileRouterFactory.NAME
public static final String ROUTER_KEY = "router";

public static final String CLUSTER_KEY = "cluster";
Expand Down Expand Up @@ -624,7 +624,7 @@ public class Constants {
public static final String QOS_PORT = "qos.port";

public static final String ACCEPT_FOREIGN_IP = "qos.accept.foreign.ip";

public static final String HESSIAN2_REQUEST_KEY = "hessian2.request";

public static final boolean DEFAULT_HESSIAN2_REQUEST = false;
Expand Down
46 changes: 43 additions & 3 deletions dubbo-common/src/main/java/com/alibaba/dubbo/common/Version.java
Expand Up @@ -23,17 +23,28 @@
import java.net.URL;
import java.security.CodeSource;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Version
*/
public final class Version {

private static final String DEFAULT_DUBBO_VERSION = "2.0.0";
private static final Logger logger = LoggerFactory.getLogger(Version.class);
private static final String VERSION = getVersion(Version.class, DEFAULT_DUBBO_VERSION);

// Dubbo RPC protocol version
public static final String DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2";
// Dubbo implementation version, usually is jar version.
private static final String VERSION = getVersion(Version.class, "");

/**
* For protocol compatibility purpose.
* Because {@link #isSupportResponseAttatchment} is checked for every call, int compare expect to has higher performance than string.
*/
private static final int LOWEST_VERSION_FOR_RESPONSE_ATTATCHMENT = 202; // 2.0.2
private static final Map<String, Integer> VERSION2INT = new HashMap<String, Integer>();

static {
// check if there's duplicated jar
Expand All @@ -43,10 +54,39 @@ public final class Version {
private Version() {
}

public static String getProtocolVersion() {
return DEFAULT_DUBBO_PROTOCOL_VERSION;
}

public static String getVersion() {
return VERSION;
}

public static boolean isSupportResponseAttatchment(String version) {
if (version == null || version.length() == 0) {
return false;
}
return getIntVersion(version) >= LOWEST_VERSION_FOR_RESPONSE_ATTATCHMENT;
}

public static int getIntVersion(String version) {
Integer v = VERSION2INT.get(version);
if (v == null) {
v = parseInt(version);
VERSION2INT.put(version, v);
}
return v;
}

private static int parseInt(String version) {
int v = 0;
String[] vArr = version.split("\\.");
int len = vArr.length;
for (int i = 1; i <= len; i++) {
v += Integer.parseInt(vArr[len - i]) * Math.pow(10, i - 1);
}
return v;
}

private static boolean hasResource(String path) {
try {
Expand Down
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.version;


import com.alibaba.dubbo.common.Version;

import org.junit.Assert;
import org.junit.Test;

public class VersionTest {

@Test
public void testGetProtocolVersion() {
Assert.assertEquals(Version.getProtocolVersion(), Version.DEFAULT_DUBBO_PROTOCOL_VERSION);
}

@Test
public void testSupportResponseAttatchment() {
Assert.assertTrue(Version.isSupportResponseAttatchment("2.0.2"));
Assert.assertTrue(Version.isSupportResponseAttatchment("2.0.3"));
Assert.assertFalse(Version.isSupportResponseAttatchment("2.0.0"));
}
}
Expand Up @@ -174,7 +174,7 @@ protected List<URL> loadRegistries(boolean provider) {
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down Expand Up @@ -220,7 +220,7 @@ protected URL loadMonitor(URL registryURL) {
appendProperties(monitor);
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
map.put("dubbo", Version.getVersion());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Expand Up @@ -280,7 +280,7 @@ private void init() {
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Expand Up @@ -367,7 +367,7 @@ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> r

Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.dubbo.config.ServiceConfig;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.RpcInvocation;

import junit.framework.TestCase;
import org.junit.Test;

Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.dubbo.config.spring.ReferenceBean;
import com.alibaba.dubbo.config.spring.ServiceBean;
import com.alibaba.dubbo.rpc.Protocol;

import org.springframework.beans.PropertyValue;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
Expand Down Expand Up @@ -213,7 +214,7 @@ private static BeanDefinition parse(Element element, ParserContext parserContext
String invokeRefMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(invokeRef);
beanDefinition.getPropertyValues().addPropertyValue("oninvokeMethod", invokeRefMethod);
}else {
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package com.alibaba.dubbo.remoting.exchange.codec;

import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.StreamUtils;
import com.alibaba.dubbo.common.logger.Logger;
Expand Down Expand Up @@ -173,7 +174,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
Expand Down Expand Up @@ -231,7 +232,7 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData());
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
Expand Down Expand Up @@ -274,7 +275,7 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult());
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else out.writeUTF(res.getErrorMessage());
out.flushBuffer();
Expand Down Expand Up @@ -442,4 +443,13 @@ protected void encodeResponseData(Channel channel, ObjectOutput out, Object data
encodeResponseData(out, data);
}

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
encodeRequestData(out, data);
}

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
encodeResponseData(out, data);
}


}
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void send(Object message, boolean sent) throws RemotingException {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion("2.0.0");
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
Expand All @@ -107,7 +108,7 @@ public ResponseFuture request(Object request, int timeout) throws RemotingExcept
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Request;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -133,7 +134,7 @@ private void sendChannelReadOnlyEvent() {
Request request = new Request();
request.setEvent(Request.READONLY_EVENT);
request.setTwoWay(false);
request.setVersion(Version.getVersion());
request.setVersion(Version.getProtocolVersion());

Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package com.alibaba.dubbo.remoting.exchange.support.header;

import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void run() {
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
Expand Down
Expand Up @@ -18,6 +18,7 @@


import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayOutputStream;
Expand Down Expand Up @@ -216,7 +217,7 @@ public void test_Decode_Return_Request_Event_Object() throws IOException {
Assert.assertEquals(person, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isEvent());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -231,7 +232,7 @@ public void test_Decode_Return_Request_Event_String() throws IOException {
Assert.assertEquals(event, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isEvent());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -244,7 +245,7 @@ public void test_Decode_Return_Request_Heartbeat_Object() throws IOException {
Assert.assertEquals(null, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isHeartbeat());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -259,7 +260,7 @@ public void test_Decode_Return_Request_Object() throws IOException {
Assert.assertEquals(person, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(false, obj.isHeartbeat());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand Down Expand Up @@ -350,7 +351,7 @@ public void test_Encode_Response() throws IOException {
Assert.assertEquals(response.isHeartbeat(), obj.isHeartbeat());
Assert.assertEquals(person, obj.getResult());
// encode response verson ??
// Assert.assertEquals(response.getVersion(), obj.getVersion());
// Assert.assertEquals(response.getProtocolVersion(), obj.getVersion());

}

Expand Down Expand Up @@ -380,15 +381,15 @@ public void test_Encode_Error_Response() throws IOException {
Assert.assertEquals(response.isHeartbeat(), obj.isHeartbeat());
Assert.assertEquals(badString, obj.getErrorMessage());
Assert.assertEquals(null, obj.getResult());
// Assert.assertEquals(response.getVersion(), obj.getVersion());
// Assert.assertEquals(response.getProtocolVersion(), obj.getVersion());
}

// http://code.alibabatech.com/jira/browse/DUBBO-392
@Test
public void testMessageLengthGreaterThanMessageActualLength() throws Exception {
Channel channel = getCliendSideChannel(url);
Request request = new Request(1L);
request.setVersion("2.0.0");
request.setVersion(Version.getProtocolVersion());
Date date = new Date();
request.setData(date);
ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
Expand Down

0 comments on commit e506367

Please sign in to comment.