Skip to content

Commit

Permalink
Merge pull request #155 from allenxwang/issue-142
Browse files Browse the repository at this point in the history
Fix Issue #142
  • Loading branch information
tbak committed Aug 20, 2014
2 parents ba6d6d0 + 481c7ad commit 4bd4aa3
Show file tree
Hide file tree
Showing 25 changed files with 686 additions and 443 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ project(':ribbon-transport') {
compile "com.netflix.rxnetty:rx-netty:${rx_netty_version}"
compile "com.netflix.rxnetty:rx-netty-contexts:${rx_netty_version}"
compile "com.netflix.rxnetty:rx-netty-servo:${rx_netty_version}"
compile 'javax.inject:javax.inject:1'
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
testCompile project(':ribbon-test')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,35 @@ public RxMovieTemplateExample(int port) {
.withMaxAutoRetriesNextServer(3)
.withConfigurationBasedServerList("localhost:" + port));

registerMovieTemplate = httpResourceGroup.newRequestTemplate("registerMovie", ByteBuf.class)
registerMovieTemplate = httpResourceGroup.newTemplateBuilder("registerMovie", ByteBuf.class)
.withMethod("POST")
.withUriTemplate("/movies")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

updateRecommendationTemplate = httpResourceGroup.newRequestTemplate("updateRecommendation", ByteBuf.class)
updateRecommendationTemplate = httpResourceGroup.newTemplateBuilder("updateRecommendation", ByteBuf.class)
.withMethod("POST")
.withUriTemplate("/users/{userId}/recommendations")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

recommendationsByUserIdTemplate = httpResourceGroup.newRequestTemplate("recommendationsByUserId", ByteBuf.class)
recommendationsByUserIdTemplate = httpResourceGroup.newTemplateBuilder("recommendationsByUserId", ByteBuf.class)
.withMethod("GET")
.withUriTemplate("/users/{userId}/recommendations")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withFallbackProvider(new RecommendationServiceFallbackHandler())
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

recommendationsByTemplate = httpResourceGroup.newRequestTemplate("recommendationsBy", ByteBuf.class)
recommendationsByTemplate = httpResourceGroup.newTemplateBuilder("recommendationsBy", ByteBuf.class)
.withMethod("GET")
.withUriTemplate("/recommendations?category={category}&ageGroup={ageGroup}")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withFallbackProvider(new RecommendationServiceFallbackHandler())
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
public class RibbonModule extends AbstractModule {
@Override
protected void configure() {
bind(RibbonResourceFactory.class).to(DefaultResourceFactory.class).in(Scopes.SINGLETON);
bind(RibbonTransportFactory.class).to(DefaultRibbonTransportFactory.class).in(Scopes.SINGLETON);
bind(ClientConfigFactory.class).to(DefaultClientConfigFactory.class).in(Scopes.SINGLETON);
bind(RibbonTransportFactory.class).to(DefaultRibbonTransportFactory.class).in(Scopes.SINGLETON);
bind(RibbonResourceFactory.class).to(DefaultResourceFactory.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,35 @@ public MyService(RibbonResourceFactory factory) {
.withMaxAutoRetriesNextServer(3)
.withConfigurationBasedServerList("localhost:" + PORT));

registerMovieTemplate = httpResourceGroup.newRequestTemplate("registerMovie", ByteBuf.class)
registerMovieTemplate = httpResourceGroup.newTemplateBuilder("registerMovie", ByteBuf.class)
.withMethod("POST")
.withUriTemplate("/movies")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

updateRecommendationTemplate = httpResourceGroup.newRequestTemplate("updateRecommendation", ByteBuf.class)
updateRecommendationTemplate = httpResourceGroup.newTemplateBuilder("updateRecommendation", ByteBuf.class)
.withMethod("POST")
.withUriTemplate("/users/{userId}/recommendations")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

recommendationsByUserIdTemplate = httpResourceGroup.newRequestTemplate("recommendationsByUserId", ByteBuf.class)
recommendationsByUserIdTemplate = httpResourceGroup.newTemplateBuilder("recommendationsByUserId", ByteBuf.class)
.withMethod("GET")
.withUriTemplate("/users/{userId}/recommendations")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withFallbackProvider(new RecommendationServiceFallbackHandler())
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();

recommendationsByTemplate = httpResourceGroup.newRequestTemplate("recommendationsBy", ByteBuf.class)
recommendationsByTemplate = httpResourceGroup.newTemplateBuilder("recommendationsBy", ByteBuf.class)
.withMethod("GET")
.withUriTemplate("/recommendations?category={category}&ageGroup={ageGroup}")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withFallbackProvider(new RecommendationServiceFallbackHandler())
.withResponseValidator(new RecommendationServiceResponseValidator());
.withResponseValidator(new RecommendationServiceResponseValidator()).build();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.client.config.ClientConfigFactory.DefaultClientConfigFactory;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.config.ConfigurationManager;
import com.netflix.ribbon.DefaultResourceFactory;
import com.netflix.ribbon.RibbonResourceFactory;
Expand All @@ -32,8 +33,11 @@
import com.netflix.ribbon.examples.rx.proxy.RxMovieProxyExample;
import com.netflix.ribbon.guice.RibbonModule;
import com.netflix.ribbon.guice.RibbonResourceProvider;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClient;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RxMovieProxyExampleTest extends RxMovieClientTestBase {
Expand Down Expand Up @@ -92,7 +96,23 @@ protected void configure() {

RxMovieProxyExample example = injector.getInstance(RxMovieProxyExample.class);
assertTrue(example.runExample());

}

@Test
public void testTransportFactoryWithInjection() {
Injector injector = Guice.createInjector(
new AbstractModule() {
@Override
protected void configure() {
bind(ClientConfigFactory.class).to(MyClientConfigFactory.class).in(Scopes.SINGLETON);
bind(RibbonTransportFactory.class).to(DefaultRibbonTransportFactory.class).in(Scopes.SINGLETON);
}
}
);

RibbonTransportFactory transportFactory = injector.getInstance(RibbonTransportFactory.class);
HttpClient<ByteBuf, ByteBuf> client = transportFactory.newHttpClient("myClient");
IClientConfig config = ((NettyHttpClient) client).getClientConfig();
assertEquals("MyConfig", config.getNameSpace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,37 @@
*/
package com.netflix.ribbon;

import com.netflix.client.config.ClientConfigFactory;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.netty.RibbonTransport;
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.protocol.http.client.HttpClient;

import javax.inject.Inject;

/**
* A dependency injection friendly Ribbon transport client factory that can create clients based on IClientConfig or
* a name which is used to construct the necessary IClientConfig.
*
* Created by awang on 7/18/14.
*/
public abstract class RibbonTransportFactory {
protected final ClientConfigFactory clientConfigFactory;

public static class DefaultRibbonTransportFactory extends RibbonTransportFactory {
@Inject
public DefaultRibbonTransportFactory(ClientConfigFactory clientConfigFactory) {
super(clientConfigFactory);
}
}

public static final RibbonTransportFactory DEFAULT = new DefaultRibbonTransportFactory();
protected RibbonTransportFactory(ClientConfigFactory clientConfigFactory) {
this.clientConfigFactory = clientConfigFactory;
}

public static final RibbonTransportFactory DEFAULT = new DefaultRibbonTransportFactory(ClientConfigFactory.DEFAULT);

public HttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
return RibbonTransport.newHttpClient(config);
Expand All @@ -42,4 +58,22 @@ public RxClient<ByteBuf, ByteBuf> newTcpClient(IClientConfig config) {
public RxClient<DatagramPacket, DatagramPacket> newUdpClient(IClientConfig config) {
return RibbonTransport.newUdpClient(config);
}

public final HttpClient<ByteBuf, ByteBuf> newHttpClient(String name) {
IClientConfig config = clientConfigFactory.newConfig();
config.loadProperties(name);
return newHttpClient(config);
}

public final RxClient<ByteBuf, ByteBuf> newTcpClient(String name) {
IClientConfig config = clientConfigFactory.newConfig();
config.loadProperties(name);
return newTcpClient(config);
}

public RxClient<DatagramPacket, DatagramPacket> newUdpClient(String name) {
IClientConfig config = clientConfigFactory.newConfig();
config.loadProperties(name);
return newUdpClient(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,6 @@
*/
package com.netflix.client.netty.http;

import static com.netflix.ribbon.testutils.TestUtils.waitUntilTrueOrTimeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.contexts.ContextsContainer;
import io.reactivex.netty.contexts.ContextsContainerImpl;
import io.reactivex.netty.contexts.MapBackedKeySupplier;
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.protocol.http.client.HttpClient.HttpClientConfig;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import io.reactivex.netty.servo.http.HttpClientListener;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

import com.google.common.collect.Lists;
import com.google.mockwebserver.MockResponse;
import com.google.mockwebserver.MockWebServer;
Expand All @@ -82,6 +42,40 @@
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.net.httpserver.HttpServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.contexts.ContextsContainer;
import io.reactivex.netty.contexts.ContextsContainerImpl;
import io.reactivex.netty.contexts.MapBackedKeySupplier;
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.protocol.http.client.HttpClient.HttpClientConfig;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import io.reactivex.netty.servo.http.HttpClientListener;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.netflix.ribbon.testutils.TestUtils.waitUntilTrueOrTimeout;
import static org.junit.Assert.*;

public class NettyClientTest {

Expand Down Expand Up @@ -286,6 +280,11 @@ public void testObservableWithMultipleServers() throws Exception {
assertEquals(1, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());

person = getPersonObservable(lbObservables.submit(request)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
HttpClientListener listener = lbObservables.getListener();
assertEquals(1, listener.getPoolReuse());
}

@Test
Expand Down Expand Up @@ -397,7 +396,6 @@ public Boolean call() {
}
});
assertEquals(0, listener.getPoolReuse());

// two requests to bad server because retry same server is set to 1
assertEquals(4, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
Expand Down Expand Up @@ -477,7 +475,6 @@ public Boolean call() {
});
assertEquals(2, listener.getConnectionCount());
assertEquals(0, listener.getPoolReuse());

assertEquals(2, externalListener.getPoolAcquires());
}

Expand Down
14 changes: 13 additions & 1 deletion ribbon/src/main/java/com/netflix/ribbon/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.ribbon;

import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;

import java.util.Map;
Expand All @@ -37,7 +38,18 @@ private ClientOptions() {
public static ClientOptions create() {
return new ClientOptions();
}


public static ClientOptions from(IClientConfig config) {
ClientOptions options = new ClientOptions();
for (IClientConfigKey key: IClientConfigKey.Keys.keys()) {
Object value = config.get(key);
if (value != null) {
options.options.put(key, value);
}
}
return options;
}

public ClientOptions withDiscoveryServiceIdentifier(String identifier) {
options.put(IClientConfigKey.Keys.DeploymentContextBasedVipAddresses, identifier);
return this;
Expand Down
20 changes: 0 additions & 20 deletions ribbon/src/main/java/com/netflix/ribbon/RequestTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package com.netflix.ribbon;

import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.ribbon.hystrix.FallbackHandler;

/**
* @author awang
*
Expand All @@ -32,24 +29,7 @@ public abstract class RequestTemplate<T, R> {

public abstract RequestTemplate<T, R> copy(String name);

public abstract RequestTemplate<T, R> withFallbackProvider(FallbackHandler<T> fallbackProvider);

public abstract RequestTemplate<T, R> withResponseValidator(ResponseValidator<R> transformer);

/**
* Calling this method will enable both Hystrix request cache and supplied external cache providers
* on the supplied cache key. Caller can explicitly disable Hystrix request cache by calling
* {@link #withHystrixProperties(com.netflix.hystrix.HystrixObservableCommand.Setter)}
*
* @param cacheKeyTemplate
* @return
*/
public abstract RequestTemplate<T, R> withRequestCacheKey(String cacheKeyTemplate);

public abstract RequestTemplate<T, R> withCacheProvider(String cacheKeyTemplate, CacheProvider<T> cacheProvider);

public abstract RequestTemplate<T, R> withHystrixProperties(HystrixObservableCommand.Setter setter);

public static abstract class RequestBuilder<T> {
public abstract RequestBuilder<T> withRequestProperty(String key, Object value);

Expand Down
Loading

0 comments on commit 4bd4aa3

Please sign in to comment.