Skip to content

Commit

Permalink
Issue Netflix#142 work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Wang committed Aug 8, 2014
1 parent 47c9e16 commit 3d4211e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,6 @@
*/
package com.netflix.client.netty.http;

import static com.netflix.ribbon.testutils.TestUtils.waitUntilTrueOrTimeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.contexts.ContextsContainer;
import io.reactivex.netty.contexts.ContextsContainerImpl;
import io.reactivex.netty.contexts.MapBackedKeySupplier;
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.protocol.http.client.HttpClient.HttpClientConfig;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import io.reactivex.netty.servo.http.HttpClientListener;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

import com.google.common.collect.Lists;
import com.google.mockwebserver.MockResponse;
import com.google.mockwebserver.MockWebServer;
Expand All @@ -82,6 +42,40 @@
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.net.httpserver.HttpServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.contexts.ContextsContainer;
import io.reactivex.netty.contexts.ContextsContainerImpl;
import io.reactivex.netty.contexts.MapBackedKeySupplier;
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.protocol.http.client.HttpClient.HttpClientConfig;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import io.reactivex.netty.servo.http.HttpClientListener;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.netflix.ribbon.testutils.TestUtils.waitUntilTrueOrTimeout;
import static org.junit.Assert.*;

public class NettyClientTest {

Expand Down Expand Up @@ -397,7 +391,8 @@ public Boolean call() {
}
});
assertEquals(0, listener.getPoolReuse());

assertEquals(1, listener.getLiveConnections());
assertEquals(4, listener.getPoolEvictions());
// two requests to bad server because retry same server is set to 1
assertEquals(4, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
Expand Down Expand Up @@ -476,6 +471,7 @@ public Boolean call() {
}
});
assertEquals(2, listener.getConnectionCount());
assertEquals(2, listener.getLiveConnections());
assertEquals(0, listener.getPoolReuse());

assertEquals(2, externalListener.getPoolAcquires());
Expand Down
14 changes: 13 additions & 1 deletion ribbon/src/main/java/com/netflix/ribbon/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.ribbon;

import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;

import java.util.Map;
Expand All @@ -37,7 +38,18 @@ private ClientOptions() {
public static ClientOptions create() {
return new ClientOptions();
}


public static ClientOptions from(IClientConfig config) {
ClientOptions options = new ClientOptions();
for (IClientConfigKey key: IClientConfigKey.Keys.keys()) {
Object value = config.get(key);
if (value != null) {
options.options.put(key, value);
}
}
return options;
}

public ClientOptions withDiscoveryServiceIdentifier(String identifier) {
options.put(IClientConfigKey.Keys.DeploymentContextBasedVipAddresses, identifier);
return this;
Expand Down
17 changes: 8 additions & 9 deletions ribbon/src/main/java/com/netflix/ribbon/ResourceGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ public abstract class ResourceGroup<T extends RequestTemplate<?, ?>> {
private String name;
private IClientConfig clientConfig;

public ResourceGroup(String name) {
this(name, null, ClientConfigFactory.DEFAULT, RibbonTransportFactory.DEFAULT);
public static abstract class TemplateBuilder<T extends RequestTemplate> {
public abstract T build();
}

public ResourceGroup(String name, ClientOptions options, ClientConfigFactory configFactory, RibbonTransportFactory transportFactory) {
protected ResourceGroup(String name) {
this(name, ClientOptions.create(), ClientConfigFactory.DEFAULT, RibbonTransportFactory.DEFAULT);
}

protected ResourceGroup(String name, ClientOptions options, ClientConfigFactory configFactory, RibbonTransportFactory transportFactory) {
this.name = name;
clientConfig = configFactory.newConfig();
clientConfig.loadProperties(name);
Expand All @@ -37,11 +41,6 @@ public ResourceGroup(String name, ClientOptions options, ClientConfigFactory con
}
}
}

public ResourceGroup(String name, IClientConfig clientConfig, RibbonTransportFactory transportFactory) {
this.name = name;
this.clientConfig = clientConfig;
}

protected final IClientConfig getClientConfig() {
return clientConfig;
Expand All @@ -51,5 +50,5 @@ public final String name() {
return name;
}

public abstract <S> T newRequestTemplate(String name, Class<? extends S> classType);
public abstract <S> TemplateBuilder newTemplateBuilder(String name, Class<? extends S> classType);
}
25 changes: 8 additions & 17 deletions ribbon/src/main/java/com/netflix/ribbon/RibbonResourceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package com.netflix.ribbon;

import com.netflix.client.config.ClientConfigFactory;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.ribbon.http.HttpResourceGroup;
import com.netflix.ribbon.http.HttpResourceGroup.Builder;
import com.netflix.ribbon.proxy.RibbonDynamicProxy;

/**
Expand All @@ -38,26 +37,18 @@ public RibbonResourceFactory(ClientConfigFactory configFactory, RibbonTransportF
this.transportFactory = transportFactory;
}

public HttpResourceGroup createHttpResourceGroup(IClientConfig config) {
return new HttpResourceGroup(config.getClientName(), config, transportFactory);
public Builder createHttpResourceGroupBuilder(String name) {
Builder builder = HttpResourceGroup.Builder.newBuilder(name, clientConfigFactory, transportFactory);
return builder;
}

public <T> T from(Class<T> classType) {
return RibbonDynamicProxy.newInstance(classType, this, clientConfigFactory, transportFactory);
}

public HttpResourceGroup createHttpResourceGroup(String name, ClientOptions options) {
return createHttpResourceGroup(getClientConfigFromOptions(name, options));
}

protected final IClientConfig getClientConfigFromOptions(String name, ClientOptions options) {
IClientConfig config = clientConfigFactory.newConfig();
config.loadProperties(name);
if (options != null) {
for (IClientConfigKey key: options.getOptions().keySet()) {
config.set(key, options.getOptions().get(key));
}
}
return config;
public Builder createHttpResourceGroup(String name, ClientOptions options) {
Builder builder = Builder.newBuilder(name, clientConfigFactory, transportFactory);
builder.withClientOptions(options);
return builder;
}
}
111 changes: 92 additions & 19 deletions ribbon/src/main/java/com/netflix/ribbon/http/HttpRequestTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
*/
package com.netflix.ribbon.http;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;

import java.util.HashMap;
import java.util.Map;

import com.netflix.client.netty.LoadBalancingRxClient;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
Expand All @@ -33,9 +23,19 @@
import com.netflix.hystrix.HystrixObservableCommand.Setter;
import com.netflix.ribbon.CacheProvider;
import com.netflix.ribbon.RequestTemplate;
import com.netflix.ribbon.ResourceGroup.TemplateBuilder;
import com.netflix.ribbon.ResponseValidator;
import com.netflix.ribbon.hystrix.FallbackHandler;
import com.netflix.ribbon.template.ParsedTemplate;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;

import java.util.HashMap;
import java.util.Map;

/**
* Provides API to construct a request template for HTTP resource.
Expand All @@ -53,6 +53,80 @@ public class HttpRequestTemplate<T> extends RequestTemplate<T, HttpClientRespons
public static final String CACHE_HYSTRIX_COMMAND_SUFFIX = "_cache";
public static final int DEFAULT_CACHE_TIMEOUT = 20;

public static class Builder<T> extends TemplateBuilder<HttpRequestTemplate<T>> {
private String name;
private HttpResourceGroup resourceGroup;
private Class<? extends T> classType;
private FallbackHandler<T> fallbackHandler;
private String method;
private ParsedTemplate parsedUriTemplate;
private String cacheKeyTemplate;
private CacheProviderWithKeyTemplate<T> cacheProvider;
private HttpHeaders headers;
private Setter setter;
private Map<String, ParsedTemplate> parsedTemplates;

private Builder(String name, HttpResourceGroup resourceGroup, Class<? extends T> classType) {
this.name = name;
this.resourceGroup = resourceGroup;
this.classType = classType;
}

private ParsedTemplate createParsedTemplate(String template) {
ParsedTemplate parsedTemplate = parsedTemplates.get(template);
if (parsedTemplate == null) {
parsedTemplate = ParsedTemplate.create(template);
parsedTemplates.put(template, parsedTemplate);
}
return parsedTemplate;
}

public static <T> Builder<T> newBuilder(String templateName, HttpResourceGroup group, Class<? extends T> classType) {
return new Builder(templateName, group, classType);
}

public Builder<T> withFallbackProvider(FallbackHandler<T> fallbackHandler) {
this.fallbackHandler = fallbackHandler;
return this;
}

public Builder<T> withMethod(String method) {
this.method = method;
return this;
}

public Builder<T> withUriTemplate(String uriTemplate) {
this.parsedUriTemplate = createParsedTemplate(uriTemplate);
return this;
}

public Builder<T> withRequestCacheKey(String cacheKeyTemplate) {
this.cacheKeyTemplate = cacheKeyTemplate;
return this;
}

public Builder<T> withCacheProvider(String keyTemplate, CacheProvider<T> cacheProvider) {
ParsedTemplate template = createParsedTemplate(keyTemplate);
this.cacheProvider = new CacheProviderWithKeyTemplate<T>(template, cacheProvider);
return this;
}

public Builder<T> withHeader(String name, String value) {
headers.add(name, value);
return this;
}

public Builder<T> withHystrixProperties(
Setter propertiesSetter) {
this.setter = setter;
return this;
}

public HttpRequestTemplate<T> build() {
return null;
}
}

private final HttpClient<ByteBuf, ByteBuf> client;
private final String clientName;
private final int maxResponseTime;
Expand Down Expand Up @@ -87,12 +161,19 @@ public final CacheProvider<T> getProvider() {
}
}

HttpRequestTemplate(String name, HttpResourceGroup group, Class<? extends T> classType, HystrixObservableCommand.Setter setter,
HttpMethod method, HttpHeaders headers, String uriTemplate,
FallbackHandler<T> fallbackHandler, ResponseValidator<HttpClientResponse<ByteBuf>> validator, CacheProviderWithKeyTemplate<T> cacheProvider,
String hystrixCacheKey) {

}

public HttpRequestTemplate(String name, HttpResourceGroup group, Class<? extends T> classType) {
this.client = group.getClient();
this.classType = classType;
clientName = client.name();
if (client instanceof LoadBalancingRxClient) {
LoadBalancingRxClient<?, ? ,?> ribbonClient = (LoadBalancingRxClient<?, ? ,?>) client;
LoadBalancingRxClient ribbonClient = (LoadBalancingRxClient) client;
maxResponseTime = ribbonClient.getResponseTimeOut();
concurrentRequestLimit = ribbonClient.getMaxConcurrentRequests();
} else {
Expand Down Expand Up @@ -141,14 +222,6 @@ public HttpRequestTemplate<T> withMethod(String method) {
return this;
}

private ParsedTemplate createParsedTemplate(String template) {
ParsedTemplate parsedTemplate = parsedTemplates.get(template);
if (parsedTemplate == null) {
parsedTemplate = ParsedTemplate.create(template);
parsedTemplates.put(template, parsedTemplate);
}
return parsedTemplate;
}

public HttpRequestTemplate<T> withUriTemplate(String uri) {
this.parsedUriTemplate = createParsedTemplate(uri);
Expand Down
Loading

0 comments on commit 3d4211e

Please sign in to comment.