Skip to content
Permalink
Browse files
Merge pull request #1827, support generic invoke and attachment for h…
…ttp/hessian protocol.

fixes  #1768, #19
  • Loading branch information
jerrick-zhu authored and chickenlj committed Jun 1, 2018
1 parent b7dde72 commit 286fb12b011fd878b3d44ff6985e082e2141a164
Showing 24 changed files with 763 additions and 21 deletions.
@@ -27,6 +27,11 @@ public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return null;
}

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
return null;
}

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return null;
@@ -36,6 +36,15 @@ public interface ProxyFactory {
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;

/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

/**
* create invoker.
*
@@ -32,10 +32,12 @@
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericException;
import com.alibaba.dubbo.rpc.service.GenericService;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;

import java.io.IOException;
@@ -52,7 +54,7 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !ProtocolUtils.isGeneric(invoker.getUrl().getParameter(Constants.GENERIC_KEY))) {
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
@@ -63,6 +65,11 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);

if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}

if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
@@ -155,13 +155,13 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept

for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(byte[].class.getName(), arg.getClass().getName());
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(JavaBeanDescriptor.class.getName(), arg.getClass().getName());
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
@@ -172,10 +172,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
return invoker.invoke(invocation);
}

private void error(String expected, String actual) throws RpcException {
private void error(String generic, String expected, String actual) throws RpcException {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
generic +
"] only support message type " +
expected +
" and your message type is " +
@@ -68,7 +68,7 @@ public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
if (exporter != null) {
return exporter;
}
final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
@@ -89,12 +89,12 @@ public void unexport() {

@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = tagert.invoke(invocation);
Result result = target.invoke(invocation);
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
@@ -22,6 +22,7 @@
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.service.EchoService;
import com.alibaba.dubbo.rpc.service.GenericService;

/**
* AbstractProxyFactory
@@ -30,6 +31,11 @@ public abstract class AbstractProxyFactory implements ProxyFactory {

@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
}

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
@@ -46,6 +52,15 @@ public <T> T getProxy(Invoker<T> invoker) throws RpcException {
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}

if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = GenericService.class;
}

return getProxy(invoker, interfaces);
}

@@ -54,6 +54,11 @@ public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
return proxyFactory.getProxy(invoker, generic);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
@@ -48,5 +48,11 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-serialization-jdk</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.rpc.protocol.hessian;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.rpc.RpcContext;
import com.caucho.hessian.client.HessianConnection;
import com.caucho.hessian.client.HessianURLConnectionFactory;

import java.io.IOException;
import java.net.URL;

public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory {

@Override
public HessianConnection open(URL url) throws IOException {
HessianConnection connection = super.open(url);
RpcContext context = RpcContext.getContext();
for (String key : context.getAttachments().keySet()) {
connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
}

return connection;
}
}
@@ -25,8 +25,11 @@
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;

import com.alibaba.dubbo.rpc.service.GenericService;
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
import com.caucho.hessian.HessianException;
import com.caucho.hessian.client.HessianConnectionException;
import com.caucho.hessian.client.HessianConnectionFactory;
import com.caucho.hessian.client.HessianProxyFactory;
import com.caucho.hessian.io.HessianMethodSerializationException;
import com.caucho.hessian.server.HessianSkeleton;
@@ -37,6 +40,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@@ -73,19 +77,31 @@ protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcExcept
serverMap.put(addr, server);
}
final String path = url.getAbsolutePath();
HessianSkeleton skeleton = new HessianSkeleton(impl, type);
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);

final String genericPath = path + "/" + Constants.GENERIC_KEY;
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));

return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
String generic = url.getParameter(Constants.GENERIC_KEY);
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
if (isGeneric) {
RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
}

HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
hessianProxyFactory.setHessian2Request(isHessian2Request);
@@ -96,6 +112,10 @@ protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
} else {
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
}
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
@@ -148,6 +168,16 @@ public void handle(HttpServletRequest request, HttpServletResponse response)
response.setStatus(500);
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());

Enumeration<String> enumeration = request.getHeaderNames();
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
request.getHeader(key));
}
}

try {
skeleton.invoke(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
@@ -16,6 +16,8 @@
*/
package com.alibaba.dubbo.rpc.protocol.hessian;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.rpc.RpcContext;
import com.caucho.hessian.client.HessianConnection;
import com.caucho.hessian.client.HessianConnectionFactory;
import com.caucho.hessian.client.HessianProxyFactory;
@@ -41,7 +43,12 @@ public void setHessianProxyFactory(HessianProxyFactory factory) {

@Override
public HessianConnection open(URL url) throws IOException {
return new HttpClientConnection(httpClient, url);
HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient, url);
RpcContext context = RpcContext.getContext();
for (String key : context.getAttachments().keySet()) {
httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
}
return httpClientConnection;
}

}

0 comments on commit 286fb12

Please sign in to comment.