Navigation Menu

Skip to content

Commit

Permalink
1877262
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jan 22, 2008
1 parent e465fb9 commit 203d0b4
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 18 deletions.
Expand Up @@ -21,6 +21,8 @@
package org.jumpmind.symmetric.web;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
Expand All @@ -29,6 +31,7 @@
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.logging.Log;
Expand All @@ -43,36 +46,72 @@ public class NodeConcurrencyFilter implements Filter {

private ServletContext context;

protected int maxNumberOfConcurrentWorkers = 20;

protected long waitTimeBetweenRetriesInMs = 1000;

public void destroy() {
}

static int numberOfWorkers;

public void doFilter(ServletRequest req, ServletResponse resp,
FilterChain chain) throws IOException, ServletException {
try {
numberOfWorkers++;
if (numberOfWorkers > getMaxNumberOfWorkers()) {
((HttpServletResponse) resp)
.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
logger
.error("Symmetric request was rejected because the server was too busy.");
} else {
static Map<String, Integer> numberOfWorkersByServlet = new HashMap<String, Integer>();

public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain chain)
throws IOException, ServletException {
String servletPath = ((HttpServletRequest) req).getServletPath();
if (!doWork(servletPath, new IWorker() {
public void work() throws ServletException, IOException {
chain.doFilter(req, resp);
}
} finally {
numberOfWorkers--;
})) {
((HttpServletResponse) resp).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}

}

protected boolean doWork(String servletPath, IWorker worker) throws ServletException, IOException {
boolean didWork = false;
int tries = 5;
int numberOfWorkers;
do {
numberOfWorkers = getNumberOfWorkers(servletPath);
if (numberOfWorkers < maxNumberOfConcurrentWorkers) {
try {
changeNumberOfWorkers(servletPath, 1);
worker.work();
didWork = true;
} finally {
changeNumberOfWorkers(servletPath, -1);
}
} else if (tries == 0) {
logger.warn("Symmetric request was rejected because the server was too busy.");
} else {
tries--;
try {
Thread.sleep(waitTimeBetweenRetriesInMs);
} catch (InterruptedException ex) {
}
}
} while (numberOfWorkers >= maxNumberOfConcurrentWorkers && tries > 0);
return didWork;
}

private int getNumberOfWorkers(String servletPath) {
Integer number = numberOfWorkersByServlet.get(servletPath);
return number == null ? 0 : number;
}

synchronized private void changeNumberOfWorkers(String servletPath, int delta) {
numberOfWorkersByServlet.put(servletPath, getNumberOfWorkers(servletPath) + delta);
}

public void init(FilterConfig config) throws ServletException {
context = config.getServletContext();
ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);
maxNumberOfConcurrentWorkers = (Integer) ctx.getBean(Constants.MAX_CONCURRENT_WORKERS);
}

private int getMaxNumberOfWorkers() {
ApplicationContext ctx = WebApplicationContextUtils
.getWebApplicationContext(context);
return (Integer) ctx.getBean(Constants.MAX_CONCURRENT_WORKERS);
interface IWorker {
public void work() throws ServletException, IOException;
}

}
@@ -0,0 +1,91 @@
package org.jumpmind.symmetric.web;

import java.io.IOException;

import javax.servlet.ServletException;

import org.testng.Assert;
import org.testng.annotations.Test;

public class NodeConcurrencyFilterTest {

@Test(groups = "continuous")
public void testFilter() throws Exception {
NodeConcurrencyFilter filter = new NodeConcurrencyFilter();
filter.maxNumberOfConcurrentWorkers = 2;
filter.waitTimeBetweenRetriesInMs = 0;

MockWorker one = new MockWorker(filter,"push");
MockWorker two = new MockWorker(filter,"push");
MockWorker three = new MockWorker(filter,"push");
MockWorker other = new MockWorker(filter,"pull");

one.start();
Thread.sleep(500);

two.start();
Thread.sleep(500);

three.start();
Thread.sleep(500);

other.start();
Thread.sleep(500);


one.hold = false;
three.hold = false;
other.hold = false;

Thread.sleep(500);

Assert.assertEquals(one.success, true);
Assert.assertEquals(three.success, false);
Assert.assertEquals(other.success, true);

MockWorker four = new MockWorker(filter,"push");
four.start();
Thread.sleep(500);

two.hold = false;
four.hold = false;

Thread.sleep(500);

Assert.assertEquals(two.success, true);
Assert.assertEquals(four.success, true);

}

class MockWorker extends Thread {

String servletPath;
boolean hold = true;

NodeConcurrencyFilter filter;

boolean success;

MockWorker(NodeConcurrencyFilter filter, String path) {
this.filter = filter;
this.servletPath = path;
}

public void run() {
try {
success = filter.doWork(servletPath, new NodeConcurrencyFilter.IWorker() {
public void work() throws ServletException, IOException {
while (hold) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
});
} catch (Exception ex) {
throw new RuntimeException();
}
}
}
}

0 comments on commit 203d0b4

Please sign in to comment.