Skip to content

Commit

Permalink
add rate limiter to protect server
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Jun 14, 2016
1 parent a80d117 commit 44e28a5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 16 deletions.
Expand Up @@ -97,7 +97,7 @@ public void run() {
tryUpdateConfigServices();
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshIntervalTimeUnit());
}

private synchronized void updateConfigServices() {
Expand Down
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.RateLimiter;

import com.ctrip.framework.apollo.Apollo;
import com.ctrip.framework.apollo.core.ConfigConsts;
Expand Down Expand Up @@ -60,9 +61,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private SchedulePolicy m_longPollSuccessSchedulePolicyInMS;
private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;
private RateLimiter m_longPollRateLimiter;
private RateLimiter m_loadConfigRateLimiter;

static {
m_executorService = Executors.newScheduledThreadPool(1,
Expand All @@ -87,12 +89,13 @@ public RemoteConfigRepository(String namespace) {
throw new ApolloConfigException("Unable to load component!", ex);
}
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollSuccessSchedulePolicyInMS = new ExponentialSchedulePolicy(100, 1000); //in millisecond
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
Expand All @@ -113,7 +116,7 @@ public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {

private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
this.m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
Expand All @@ -124,7 +127,7 @@ public void run() {
Cat.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshIntervalTimeUnit());
}

@Override
Expand Down Expand Up @@ -158,6 +161,7 @@ private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig)
}

private ApolloConfig loadApolloConfig() {
m_loadConfigRateLimiter.acquire();
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
Expand Down Expand Up @@ -281,8 +285,8 @@ private void doLongPollingRefresh(String appId, String cluster, String dataCente
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
m_longPollRateLimiter.acquire();
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
long sleepTime = 50; //default 50 ms
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
Expand Down Expand Up @@ -316,12 +320,8 @@ public void run() {
trySync();
}
});
m_longPollSuccessSchedulePolicyInMS.success();
}

if (response.getStatusCode() == 304) {
sleepTime = m_longPollSuccessSchedulePolicyInMS.fail();
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
Expand All @@ -333,14 +333,13 @@ public void run() {
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
sleepTime = sleepTimeInSecond * 1000;
} finally {
transaction.complete();
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
Expand Down
Expand Up @@ -27,12 +27,15 @@ public class ConfigUtil {
private int connectTimeout = 5000; //5 seconds
private int readTimeout = 10000; //10 seconds
private String cluster;
private int loadConfigQPS = 2; //2 times per second
private int longPollQPS = 2; //2 times per second

public ConfigUtil() {
initRefreshInterval();
initConnectTimeout();
initReadTimeout();
initCluster();
initQPS();
}

/**
Expand Down Expand Up @@ -145,7 +148,35 @@ public int getRefreshInterval() {
return refreshInterval;
}

public TimeUnit getRefreshTimeUnit() {
public TimeUnit getRefreshIntervalTimeUnit() {
return refreshIntervalTimeUnit;
}

private void initQPS() {
String customizedLoadConfigQPS = System.getProperty("apollo.loadConfigQPS");
if (!Strings.isNullOrEmpty(customizedLoadConfigQPS)) {
try {
loadConfigQPS = Integer.parseInt(customizedLoadConfigQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.loadConfigQPS is invalid: {}", customizedLoadConfigQPS);
}
}

String customizedLongPollQPS = System.getProperty("apollo.longPollQPS");
if (!Strings.isNullOrEmpty(customizedLongPollQPS)) {
try {
longPollQPS = Integer.parseInt(customizedLongPollQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.longPollQPS is invalid: {}", customizedLongPollQPS);
}
}
}

public int getLoadConfigQPS() {
return loadConfigQPS;
}

public int getLongPollQPS() {
return longPollQPS;
}
}
Expand Up @@ -153,7 +153,7 @@ public int getRefreshInterval() {
}

@Override
public TimeUnit getRefreshTimeUnit() {
public TimeUnit getRefreshIntervalTimeUnit() {
return refreshTimeUnit;
}

Expand All @@ -166,6 +166,16 @@ public Env getApolloEnv() {
public String getDataCenter() {
return someDataCenter;
}

@Override
public int getLoadConfigQPS() {
return 200;
}

@Override
public int getLongPollQPS() {
return 200;
}
}

/**
Expand Down
Expand Up @@ -192,6 +192,16 @@ public String getCluster() {
public String getDataCenter() {
return null;
}

@Override
public int getLoadConfigQPS() {
return 200;
}

@Override
public int getLongPollQPS() {
return 200;
}
}

public static class MockConfigServiceLocator extends ConfigServiceLocator {
Expand Down

0 comments on commit 44e28a5

Please sign in to comment.