Skip to content

Commit

Permalink
Merge pull request #137 from allenxwang/cp
Browse files Browse the repository at this point in the history
Fix #136
  • Loading branch information
Allen Wang committed Jul 17, 2014
2 parents 435246d + baa772f commit b35e376
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 52 deletions.
10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ project(':ribbon-loadbalancer') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.netflix-commons:netflix-statistics:0.1.1'
compile 'com.netflix.rxjava:rxjava-core:[0.17,)'
compile 'com.netflix.rxjava:rxjava-core:0.18+'
}
}

Expand Down Expand Up @@ -102,7 +102,10 @@ project(':ribbon-examples') {
project(':ribbon-test') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.rxjava:rxjava-core:[0.17,)'
compile project(':ribbon-eureka')
compile 'org.powermock:powermock-easymock-release-full:1.5.4'
compile 'org.easymock:easymock:3.2'
compile 'com.netflix.rxjava:rxjava-core:0.18+'
compile 'com.sun.jersey:jersey-server:1.11'
compile 'javax.ws.rs:jsr311-api:1.1.1'
compile 'com.sun.jersey:jersey-core:1.11'
Expand Down Expand Up @@ -143,11 +146,12 @@ project(':ribbon') {
compile 'com.netflix.hystrix:hystrix-core:1.4.0-RC4'
compile 'com.netflix.evcache:evcache-client:1.0.5'
compile project(':ribbon-transport')
testCompile project(':ribbon-test')
testCompile 'com.google.mockwebserver:mockwebserver:20130706'

}
}

task wrapper(type: Wrapper) {
gradleVersion = '1.12'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public Builder withServerListRefreshIntervalMills(int value) {
config.set(CommonClientConfigKey.ServerListRefreshInterval, value);
return this;
}

public Builder withZoneAffinityEnabled(boolean value) {
config.set(CommonClientConfigKey.EnableZoneAffinity, value);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,20 @@
*/
package com.netflix.loadbalancer;

import java.util.Date;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -29,19 +41,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* A LoadBalancer that has the capabilities to obtain the candidate list of
* servers using a dynamic source. i.e. The list of servers can potentially be
Expand Down Expand Up @@ -171,7 +170,7 @@ void restOfInit(IClientConfig clientConfig) {
.primeConnections(getServerList(true));
}
this.setEnablePrimingConnections(primeConnection);

LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;

public class LoadBalancerBuilder<T extends Server> {

Expand Down Expand Up @@ -57,35 +58,61 @@ public LoadBalancerBuilder<T> withServerListFilter(ServerListFilter<T> serverLis

public BaseLoadBalancer buildFixedServerListLoadBalancer(List<T> servers) {
if (rule == null) {
rule = createDefaultRule(config);
rule = createRuleFromConfig(config);
}
BaseLoadBalancer lb = new BaseLoadBalancer(config, rule, ping);
lb.setServersList(servers);
return lb;
}

private static IRule createDefaultRule(IClientConfig config) {
AvailabilityFilteringRule rule = new AvailabilityFilteringRule();
rule.initWithNiwsConfig(config);
private static IRule createRuleFromConfig(IClientConfig config) {
String ruleClassName = config.get(IClientConfigKey.Keys.NFLoadBalancerRuleClassName);
if (ruleClassName == null) {
throw new IllegalArgumentException("NFLoadBalancerRuleClassName is not specified in the config");
}
IRule rule;
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, config);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rule;
}

private static ServerList<Server> createDefaultServerList(IClientConfig config) {
ConfigurationBasedServerList list = new ConfigurationBasedServerList();
list.initWithNiwsConfig(config);
private static ServerList<Server> createServerListFromConfig(IClientConfig config) {
String serverListClassName = config.get(IClientConfigKey.Keys.NIWSServerListClassName);
if (serverListClassName == null) {
throw new IllegalArgumentException("NIWSServerListClassName is not specified in the config");
}
ServerList<Server> list;
try {
list = (ServerList<Server>) ClientFactory.instantiateInstanceWithClientConfig(serverListClassName, config);
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}

/**
* Build a {@link ZoneAwareLoadBalancer} with a dynamic {@link ServerList} and an {@link IRule}. The {@link ServerList} can be
* either set in the {@link #withDynamicServerList(ServerList)} or in the {@link IClientConfig} using {@link CommonClientConfigKey#NIWSServerListClassName}.
* The {@link IRule} can be either set by {@link #withRule(IRule)} or in the {@link IClientConfig} using
* {@link CommonClientConfigKey#NFLoadBalancerRuleClassName}.
*/
public ZoneAwareLoadBalancer<T> buildDynamicServerListLoadBalancer() {
if (serverListImpl == null) {
serverListImpl = createDefaultServerList(config);
serverListImpl = createServerListFromConfig(config);
}
if (rule == null) {
rule = createDefaultRule(config);
rule = createRuleFromConfig(config);
}
return new ZoneAwareLoadBalancer<T>(config, rule, ping, serverListImpl, serverListFilter);
}

/**
* Build a load balancer using the configuration from the {@link IClientConfig} only. It uses reflection to initialize necessary load balancer
* components.
*/
public ILoadBalancer buildLoadBalancerFromConfigWithReflection() {
String loadBalancerClassName = config.get(CommonClientConfigKey.NFLoadBalancerClassName);
if (loadBalancerClassName == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
package com.netflix.loadbalancer;

import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

import com.netflix.client.ClientException;
import com.netflix.client.DefaultLoadBalancerRetryHandler;
import com.netflix.client.RetryHandler;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.utils.RxUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.observers.SafeSubscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.SafeSubscriber;
import rx.subscriptions.SerialSubscription;

import com.netflix.client.ClientException;
import com.netflix.client.DefaultLoadBalancerRetryHandler;
import com.netflix.client.RetryHandler;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.utils.RxUtils;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Provides APIs to execute and retry tasks on a server chosen by the associated load balancer.
Expand Down Expand Up @@ -265,7 +260,7 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
logger.debug("Got error %s when executed on server %s", e, server);
logger.debug("Got error {} when executed on server {}", e, server);
recordStats(entity, e);
int maxRetries = errorHandler.getMaxRetriesOnSameServer();
boolean shouldRetry = maxRetries > 0 && errorHandler.isRetriableException(e, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.ribbon.testutils;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.loadbalancer.Server;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.List;

import static org.easymock.EasyMock.expect;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.replay;

@RunWith(PowerMockRunner.class)
@PrepareForTest( {DiscoveryManager.class, DiscoveryClient.class} )
@PowerMockIgnore({"javax.management.*", "com.sun.jersey.*", "com.sun.*", "org.apache.*", "weblogic.*", "com.netflix.config.*", "com.sun.jndi.dns.*",
"javax.naming.*", "com.netflix.logging.*", "javax.ws.*"})

@Ignore
public abstract class MockedDiscoveryServerListTest {
protected abstract List<Server> getMockServerList();

protected abstract String getVipAddress();


static List<InstanceInfo> getDummyInstanceInfo(String appName, List<Server> serverList){
List<InstanceInfo> list = new ArrayList<InstanceInfo>();
for (Server server: serverList) {
InstanceInfo info = InstanceInfo.Builder.newBuilder().setAppName(appName)
.setHostName(server.getHost())
.setPort(server.getPort())
.build();
list.add(info);
}
return list;
}

@Before
public void setupMock(){
List<InstanceInfo> instances = getDummyInstanceInfo("dummy", getMockServerList());
PowerMock.mockStatic(DiscoveryManager.class);
PowerMock.mockStatic(DiscoveryClient.class);

DiscoveryClient mockedDiscoveryClient = createMock(DiscoveryClient.class);
DiscoveryManager mockedDiscoveryManager = createMock(DiscoveryManager.class);

expect(DiscoveryClient.getZone((InstanceInfo) EasyMock.anyObject())).andReturn("dummyZone").anyTimes();
expect(DiscoveryManager.getInstance()).andReturn(mockedDiscoveryManager).anyTimes();
expect(mockedDiscoveryManager.getDiscoveryClient()).andReturn(mockedDiscoveryClient).anyTimes();

expect(mockedDiscoveryClient.getInstancesByVipAddress(getVipAddress(), false, null)).andReturn(instances).anyTimes();

replay(DiscoveryManager.class);
replay(DiscoveryClient.class);
replay(mockedDiscoveryManager);
replay(mockedDiscoveryClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.netflix.client.ssl.URLSslContextFactory;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.LoadBalancerBuilder;
import com.netflix.loadbalancer.LoadBalancerExecutor;
import com.netflix.loadbalancer.LoadBalancerObservableCommand;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerStats;
Expand Down Expand Up @@ -97,7 +98,7 @@ public NettyHttpClient(
RetryHandler retryHandler,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ScheduledExecutorService poolCleanerScheduler) {
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildDynamicServerListLoadBalancer(),
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildLoadBalancerFromConfigWithReflection(),
config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}

Expand Down Expand Up @@ -299,6 +300,10 @@ HttpClientListener getListener() {
return (HttpClientListener) listener;
}

LoadBalancerExecutor getLoadBalancerExecutor() {
return lbExecutor;
}

@Override
protected MetricEventsListener<? extends ClientMetricsEvent<?>> createListener(
String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.client.netty.http;

import com.google.common.collect.Lists;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.loadbalancer.LoadBalancerExecutor;
import com.netflix.loadbalancer.Server;
import com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList;
import com.netflix.ribbon.testutils.MockedDiscoveryServerListTest;
import io.netty.buffer.ByteBuf;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class DiscoveryLoadBalancerTest extends MockedDiscoveryServerListTest {

@Override
protected List<Server> getMockServerList() {
return Lists.newArrayList(new Server("www.google.com", 80), new Server("www.microsoft.com", 80));
}

@Override
protected String getVipAddress() {
return "myvip";
}

@Test
public void testLoadBalancer() {
IClientConfig config = IClientConfig.Builder.newBuilder().withDefaultValues()
.withDeploymentContextBasedVipAddresses(getVipAddress()).build()
.set(IClientConfigKey.Keys.NIWSServerListClassName, DiscoveryEnabledNIWSServerList.class.getName());
NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(config);
LoadBalancerExecutor lbExecutor = client.getLoadBalancerExecutor();
List<Server> serverList = lbExecutor.getLoadBalancer().getServerList(false);
assertEquals(getMockServerList(), serverList);
}
}
Loading

0 comments on commit b35e376

Please sign in to comment.