Skip to content

Commit

Permalink
Add reactive cache implementation based on caffeine (#1)
Browse files Browse the repository at this point in the history
* refactor: replace infinite loop with Flux.interval()

* test: add validation for properties auto-configuration

* feat: add caffeine reactive cache

* update README

* commit badge

---------

Co-authored-by: GangCheng <gang.cheng@lanmudata.com>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 21, 2023
1 parent 29aab40 commit 0f66f43
Show file tree
Hide file tree
Showing 18 changed files with 786 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/badges/jacoco.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
* This project is compatible with `reactivestream` aka [`project-reactor`](https://projectreactor.io/)
* This project could integrate with `Spring Framework` (version>= 2.x)
* This project implement reactive cache for business application scenarios. The default implementation includes the following:
* `InmemeoryReactiveCache` uses `java.util.DelayQueue` to implement cache behavior
* `RedisReactiveCache` uses `redis` and `spring-boot-redis` to implement cache behavior
* `DefaultReactiveCache` uses customized configuration to implement cache behavior
* `InmemeoryReactiveCache` uses `java.util.concurrent.DelayQueue` to implement cache behavior
* `CaffeineReactiveCache` uses `com.github.benmanes.caffeine.cache.Cache` to implement cache behavior
* `RedisReactiveCache` uses `redis` and `spring-boot-data-redis` to implement cache behavior
* `DefaultReactiveCache` uses customized configuration which specific configured by uses to implement cache behavior

#### Usage

Expand All @@ -27,6 +28,15 @@ ReactiveCacheManager reactiveCacheManager = ReactiveCacheManagerBuilder.newInmem
.build();
```

> CaffeineReactiveCache
```java
ReactiveCacheManager reactiveCacheManager = ReactiveCacheManagerBuilder.newCaffeineReactiveManagerBuilder()
.withMaxWaitingDuration(Duration.ofSeconds(5))
.build();
```


> RedisReactiveCache
```java
Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pro.chenggang.project.reactive.cache.support.configuration;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -28,6 +29,7 @@
@ConditionalOnClass({Flux.class, Mono.class})
@AutoConfiguration
@Configuration
@EnableAutoConfiguration
public class ReactiveCacheAutoConfiguration {

@ConfigurationProperties(prefix = PREFIX)
Expand All @@ -45,6 +47,15 @@ public ReactiveCacheManager inmemoryReactiveCacheManager(ReactiveCacheSupportPro
.build();
}

@ConditionalOnProperty(prefix = PREFIX, value = "type", havingValue = "caffeine")
@ConditionalOnMissingBean(ReactiveCacheManager.class)
@Bean
public ReactiveCacheManager caffeineReactiveCacheManager(ReactiveCacheSupportProperties reactiveCacheSupportProperties) {
return ReactiveCacheManagerBuilder.newCaffeineReactiveManagerBuilder()
.withMaxWaitingDuration(reactiveCacheSupportProperties.getMaxWaitingDuration())
.build();
}

@SuppressWarnings("all")
@ConditionalOnProperty(prefix = PREFIX, value = "type", havingValue = "redis")
@ConditionalOnBean(ReactiveRedisTemplate.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package pro.chenggang.project.reactive.cache.support.configuration.properties;

import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;

import javax.validation.constraints.NotNull;
import java.time.Duration;

/**
Expand All @@ -24,17 +24,17 @@ public class ReactiveCacheSupportProperties {
/**
* The max waiting duration
*/
@NonNull
@NotNull(message = "Max waiting duration cloud not be null")
private Duration maxWaitingDuration = Duration.ofSeconds(3);

/**
* The reactive cache type
*/
@NonNull
private ReactiveCacheType type = ReactiveCacheType.inmemory;
@NotNull(message = "Reactive cache type could not be null")
private ReactiveCacheType type;

/**
* The reactive cache type
* The reactive cache type enum
*/
public enum ReactiveCacheType {

Expand All @@ -43,6 +43,11 @@ public enum ReactiveCacheType {
*/
inmemory,

/**
* The caffeine reactive cache type
*/
caffeine,

/**
* The redis reactive cache type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pro.chenggang.project.reactive.cache.support.core.adapter.ReactiveCacheMonoAdapter;
import pro.chenggang.project.reactive.cache.support.defaults.DefaultReactiveCacheManager;
import pro.chenggang.project.reactive.cache.support.defaults.DefaultReactiveCacheManagerAdapter;
import pro.chenggang.project.reactive.cache.support.defaults.caffeine.CaffeineReactiveCacheManagerAdapter;
import pro.chenggang.project.reactive.cache.support.defaults.inmemory.InmemoryReactiveCacheLock;
import pro.chenggang.project.reactive.cache.support.defaults.inmemory.InmemoryReactiveCacheManagerAdapter;
import pro.chenggang.project.reactive.cache.support.defaults.redis.RedisReactiveCacheLock;
Expand All @@ -35,6 +36,15 @@ public static InmemoryReactiveManagerBuilder newInmemoryReactiveManagerBuilder()
return new InmemoryReactiveManagerBuilder();
}

/**
* New caffeine reactive manager builder.
*
* @return the caffeine reactive manager builder
*/
public static CaffeineReactiveManagerBuilder newCaffeineReactiveManagerBuilder() {
return new CaffeineReactiveManagerBuilder();
}

/**
* New redis reactive manager builder.
*
Expand Down Expand Up @@ -170,6 +180,25 @@ public ReactiveCacheManager build() {
}
}

/**
* The caffeine reactive manager builder.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class CaffeineReactiveManagerBuilder extends BaseReactiveManagerBuilder<CaffeineReactiveManagerBuilder> {

@Override
public CaffeineReactiveManagerBuilder self() {
return this;
}

@Override
public ReactiveCacheManager build() {
return new DefaultReactiveCacheManager(new CaffeineReactiveCacheManagerAdapter(maxWaitingDuration,
new InmemoryReactiveCacheLock()
));
}
}

/**
* The redis reactive manager builder.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package pro.chenggang.project.reactive.cache.support.defaults.caffeine;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import pro.chenggang.project.reactive.cache.support.core.adapter.ReactiveCacheFluxAdapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The inmemory reactive cache flux adapter by using caffeine
*
* @author Gang Cheng
* @version 1.0.0
* @since 1.0.0
*/
@Slf4j
@RequiredArgsConstructor
public class CaffeineReactiveCacheFluxAdapter implements ReactiveCacheFluxAdapter {

private final ConcurrentHashMap<String, Cache<String, ConcurrentLinkedDeque<Object>>> fluxDataCache = new ConcurrentHashMap<>();

@Override
public Mono<Boolean> hasData(@NonNull String cacheKey) {
return Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() ->
fluxDataCache.containsKey(cacheKey)
&&
fluxDataCache.get(cacheKey)
.asMap()
.containsKey(cacheKey)))
);
}

@SuppressWarnings("unchecked")
@Override
public <T> Flux<T> loadData(@NonNull String cacheKey) {
return Flux.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() ->
Optional.ofNullable(fluxDataCache.get(cacheKey)))
)
.flatMap(Mono::justOrEmpty)
.flatMapMany(cache -> Mono.justOrEmpty(cache.getIfPresent(cacheKey))
.flatMapMany(cachedData -> (Flux<T>) Flux.fromIterable(cachedData))
)
);
}

@Override
public <T> Flux<T> cacheData(@NonNull String cacheKey,
@NonNull Duration cacheDuration,
@NonNull Flux<T> sourcePublisher) {
final AtomicBoolean initFlag = new AtomicBoolean(false);
return Flux.zip(sourcePublisher,
sourcePublisher.share()
.concatMap(item -> {
if (initFlag.compareAndSet(false, true)) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> fluxDataCache.compute(
cacheKey,
(key, value) -> {
ConcurrentLinkedDeque<Object> data = new ConcurrentLinkedDeque<>();
data.add(item);
if (Objects.isNull(value)) {
Cache<String, ConcurrentLinkedDeque<Object>> cache = Caffeine.newBuilder()
.expireAfterWrite(cacheDuration)
.build();
cache.put(cacheKey, data);
return cache;
}
value.invalidateAll();
Cache<String, ConcurrentLinkedDeque<Object>> cache = Caffeine.newBuilder()
.expireAfterWrite(cacheDuration)
.build();
cache.put(cacheKey, data);
return cache;
}
)));
}
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> Optional.ofNullable(
fluxDataCache.get(cacheKey))))
.flatMap(Mono::justOrEmpty)
.flatMap(asyncCache -> Mono.justOrEmpty(asyncCache.getIfPresent(cacheKey))
.flatMap(deque -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
deque.add(item);
return true;
})))
);
})
)
.map(Tuple2::getT1);
}

@Override
public Mono<Void> cleanupData(@NonNull String cacheKey) {
return Mono.fromFuture(CompletableFuture.runAsync(() -> {
Cache<String, ConcurrentLinkedDeque<Object>> cache = fluxDataCache.remove(cacheKey);
if (Objects.nonNull(cache)) {
cache.invalidateAll();
}
log.debug("[Caffeine reactive cache flux adapter]Cleanup cached data success, CacheKey: {}", cacheKey);
}));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pro.chenggang.project.reactive.cache.support.defaults.caffeine;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import pro.chenggang.project.reactive.cache.support.core.ReactiveCache;
import pro.chenggang.project.reactive.cache.support.core.adapter.ReactiveCacheManagerAdapter;
import pro.chenggang.project.reactive.cache.support.defaults.DefaultReactiveCache;
import pro.chenggang.project.reactive.cache.support.defaults.inmemory.InmemoryReactiveCacheLock;

import java.time.Duration;

/**
* The inmemory reactive cache manager adapter by using caffeine.
*
* @author Gang Cheng
* @version 1.0.0
* @since 1.0.0
*/
@Slf4j
@RequiredArgsConstructor
public class CaffeineReactiveCacheManagerAdapter implements ReactiveCacheManagerAdapter {

@NonNull
private final Duration maxWaitingDuration;
@NonNull
private final InmemoryReactiveCacheLock inmemoryReactiveCacheLock;

@Override
public ReactiveCache initializeReactiveCache(@NonNull String name) {
return new DefaultReactiveCache(name,
maxWaitingDuration,
inmemoryReactiveCacheLock,
new CaffeineReactiveCacheMonoAdapter(),
new CaffeineReactiveCacheFluxAdapter()
);
}
}
Loading

0 comments on commit 0f66f43

Please sign in to comment.