Bucket4j
supports any GRID solution which compatible with JCache API (JSR 107) specification.
Note
|
Do not forget to read Distributed usage checklist before using the Bucket4j over the JCache cluster. |
To use the JCache extension you also need to add the following dependency:
<!-- For java 17 -->
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j_jdk17-jcache</artifactId>
<version>{revnumber}</version>
</dependency>
<!-- For java 11 -->
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j_jdk11-jcache</artifactId>
<version>{revnumber}</version>
</dependency>
JCache expects javax.cache.cache-api to be a provided dependency. Do not forget to add the following dependency:
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
Imagine that you develop any Servlet-based WEB application and want to limit access per IP basis. You want to use the same limits for each IP - 30 requests per minute.
ServletFilter would be the obvious place to check limits:
public class IpThrottlingFilter implements javax.servlet.Filter {
private static final BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(limit -> limit.capacity(30).refillGreedy(30, ofMinutes(1)))
.build();
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = Bucket4jJCache
.entryProcessorBasedBuilder(cache)
// setup optional parameters if necessary
.build();
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String ip = IpHelper.getIpFromRequest(httpRequest);
// acquire cheap proxy to the bucket
Bucket bucket = proxyManager.getProxy(key, () -> configuration);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
}
Imagine that you provide paid language translation service via HTTP. Each user has a unique agreement that differs from the other. Details of each agreement are stored in a relational database and take significant time to fetch(for example 100ms). The example above will not work fine in this case, because time to create/fetch the configuration of the bucket from the database will be 100 times slower than limit-checking itself. Bucket4j solves this problem via lazy configuration suppliers which are called if and only if the bucket was not yet stored in the grid, thus it is possible to implement a solution that will read the agreement from the database once per user.
public class IpThrottlingFilter implements javax.servlet.Filter {
// service to provide per user limits
@Inject
private LimitProvider limitProvider;
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = Bucket4jJCache
.entryProcessorBasedBuilder(cache)
// setup optional parameters if necessary
.build();
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String userId = AutentificationHelper.getUserIdFromRequest(httpRequest);
// prepare configuration supplier which will be called(on the first interaction with proxy) if the bucket was not saved yet previously.
Supplier<BucketConfiguration> configurationLazySupplier = getConfigSupplierForUser(userId);
// acquire cheap proxy to the bucket
Bucket bucket = proxyManager.getProxy(key, configurationLazySupplier);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
private Supplier<BucketConfiguration> getConfigSupplierForUser(String userId) {
return () -> {
long translationsPerDay = limitProvider.readPerDayLimitFromAgreementsDatabase(userId);
return BucketConfiguratiion.builder()
.addLimit(limit -> limit.capacity(translationsPerDay).refillGreedy(1_000, ofDays(1)))
.build();
};
}
}
Why JCache specification is not enough in modern stacks and since 3.0 were introduced the dedicated modules for Infinispan, Hazelcast, Coherence and Ignite?
Asynchronous processing is very important for high-throughput applications, but JCache specification does not specify asynchronous API, because two early attempts to bring this kind of functionality at spec level 307, 312 were failed in absence of consensus.
Also, implementing the asynchronous support for any other JCache provider outside the list above should be an easy exercise, so feel free to return back the pull request addressed to cover your favorite JCache provider.
Important
|
Keep in mind that there are many non-certified implementations of JCache specifications on the market. Many of them want to increase their popularity by declaring support for the JCache API, but often only the API is supported and the semantic of JCache is totally ignored. Usage Bucket4j with this kind of library should be completely avoided. |
Bucket4j is only compatible with implementations that obey the JCache specification rules(especially related to EntryProcessor execution). Oracle Coherence, Apache Ignite, Hazelcast are good examples of safe implementations of JCache.
Important
|
Because it is impossible to test all possible JCache providers, you need to test your provider by yourself. |
Just run this code in order to be sure that your implementation of JCache provides good isolation for EntryProcessors
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import java.util.concurrent.CountDownLatch;
import java.io.Serializable;
public class CompatibilityTest {
final Cache<String, Integer> cache;
public CompatibilityTest(Cache<String, Integer> cache) {
this.cache = cache;
}
public void test() throws InterruptedException {
String key = "42";
int threads = 4;
int iterations = 1000;
cache.put(key, 0);
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
try {
for (int j = 0; j < iterations; j++) {
EntryProcessor<String, Integer, Void> processor = (EntryProcessor<String, Integer, Void> & Serializable) (mutableEntry, objects) -> {
int value = mutableEntry.getValue();
mutableEntry.setValue(value + 1);
return null;
};
cache.invoke(key, processor);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
int value = cache.get(key);
if (value == threads * iterations) {
System.out.println("Implementation which you use is compatible with Bucket4j");
} else {
String msg = "Implementation which you use is not compatible with Bucket4j";
msg += ", " + (threads * iterations - value) + " writes are missed";
throw new IllegalStateException(msg);
}
}
}
The check does 4000 increments of integer in parallel and verifies that no one update has been missed. If the check passed then your JCache provider is compatible with Bucket4j, the throttling will work fine in a distributed and concurrent environment. If the check is not passed, then reach out to the particular JCache provider team and consult why its implementation misses the writes.