Skip to content
Permalink
Browse files

Add ExpiringCacheAsync cache and test (#3932)

* Add ExpiringCacheAsync cache and test.
* Add documentation for caches.

Signed-off-by: David Gräff <david.graeff@web.de>

* ExpiringCache(Async): Fix license header. Fix nullable annotations.

Signed-off-by: David Gräff <david.graeff@web.de>
  • Loading branch information...
davidgraeff authored and kaikreuzer committed Sep 6, 2017
1 parent 74880d0 commit bad3b144842a530a5390eda13e62d14b18b48274
@@ -0,0 +1,97 @@
/**
* Copyright (c) 2010-2017 by the respective copyright holders.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.smarthome.core.cache;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import org.junit.Test;

/**
* Tests cases for {@link ExpiringAsyncCache}.
*
* @author David Graeff - Initial contribution
*/
public class ExpiringCacheAsyncTest {
double theValue = 0;

@Test(expected = IllegalArgumentException.class)
public void testConstructorWrongCacheTime() {
// Fail if cache time is <= 0
new ExpiringCacheAsync<Double>(0);
}

@Test
public void testFetchValue() throws InterruptedException, ExecutionException {
ExpiringCacheAsync<Double> t = new ExpiringCacheAsync<Double>(100);
assertTrue(t.isExpired());
// We should always be able to get the raw value, expired or not
assertNull(t.getLastKnownValue());

// Define a supplier which returns a future that is immediately completed.
@SuppressWarnings({ "unchecked", "null" })
Supplier<CompletableFuture<Double>> s = mock(Supplier.class);
when(s.get()).thenReturn(CompletableFuture.completedFuture(10.0));

// We expect an immediate result with the value 10.0
assertEquals(10.0, t.getValue(s).get(), 0.0);
// The value should be valid
assertFalse(t.isExpired());

// We expect an immediate result with the value 10.0, but not additional call to the supplier
assertEquals(10.0, t.getValue(s).get(), 0.0);
verify(s, times(1)).get();

// Wait
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
return;
}
// We expect an immediate result with the value 10.0, and an additional call to the supplier
assertEquals(10.0, t.getValue(s).get(), 0.0);
verify(s, times(2)).get();

// We should always be able to get the raw value, expired or not
t.invalidateValue();
assertEquals(10.0, t.getLastKnownValue(), 0.0);
assertTrue(t.isExpired());
}

@Test
public void testMutipleGetsWhileRefetching() {
ExpiringCacheAsync<Double> t = new ExpiringCacheAsync<Double>(100);

CompletableFuture<Double> v = new CompletableFuture<Double>();

// Define a supplier which returns a future that is not yet completed
Supplier<CompletableFuture<Double>> s = () -> v;

assertNull(t.currentNewValueRequest);

// Multiple get requests while the cache is still refreshing
CompletableFuture<Double> result1 = t.getValue(s);
CompletableFuture<Double> result2 = t.getValue(s);
assertFalse(result1.isDone());
assertFalse(result2.isDone());
result1.thenAccept(newValue -> theValue = newValue);
assertNotNull(t.currentNewValueRequest);

// The refresh is finally done
v.complete(10.0);
assertEquals(10.0, theValue, 0.0);
assertTrue(result1.isDone());
assertTrue(result2.isDone());
assertNull(t.currentNewValueRequest);
}
}
@@ -10,8 +10,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jdt.annotation.Nullable;

/**
* This is a simple expiring and reloading cache implementation.
@@ -20,21 +19,19 @@
* answer from the last calculation is not valid anymore, i.e. if it is expired.
*
* @author Christoph Weitkamp - Initial contribution and API.
*
*
* @param <V> the type of the value
*/
public class ExpiringCache<V> {

private final Logger logger = LoggerFactory.getLogger(ExpiringCache.class);

private final long expiry;
private final Supplier<V> action;
@Nullable
private V value;
private long expiresAt;

/**
* Create a new instance.
*
*
* @param expiry the duration in milliseconds for how long the value stays valid
* @param action the action to retrieve/calculate the value
*/
@@ -51,9 +48,8 @@ public ExpiringCache(long expiry, Supplier<V> action) {

/**
* Returns the value - possibly from the cache, if it is still valid.
*
* @return the value
*/
@Nullable
public synchronized V getValue() {
if (value == null || isExpired()) {
return refreshValue();
@@ -71,9 +67,10 @@ public synchronized void invalidateValue() {

/**
* Refreshes and returns the value in the cache.
*
*
* @return the new value
*/
@Nullable
public synchronized V refreshValue() {
value = action.get();
expiresAt = System.nanoTime() + expiry;
@@ -82,7 +79,7 @@ public synchronized V refreshValue() {

/**
* Checks if the value is expired.
*
*
* @return true if the value is expired
*/
public boolean isExpired() {
@@ -0,0 +1,132 @@
/**
* Copyright (c) 2010-2017 by the respective copyright holders.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.smarthome.core.cache;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;

/**
* Complementary class to {@link org.eclipse.smarthome.core.cache.ExpiringCache}, implementing an asynchronous variant
* of an expiring cache. An instance returns the cached value immediately to the callback if not expired yet, otherwise
* issue a fetch in another thread and notify callback implementors asynchronously.
*
* @author David Graeff - Initial contribution
*
* @param <V> the type of the cached value
*/
public class ExpiringCacheAsync<V> {
protected final long expiry;
protected long expiresAt = 0;
protected CompletableFuture<V> currentNewValueRequest = null;
@Nullable
protected V value;

/**
* Create a new instance.
*
* @param expiry the duration in milliseconds for how long the value stays valid. Must be greater than 0.
* @throws IllegalArgumentException For an expire value <=0.
*/
public ExpiringCacheAsync(long expiry) throws IllegalArgumentException {
if (expiry <= 0) {
throw new IllegalArgumentException("Cache expire time must be greater than 0");
}
this.expiry = TimeUnit.MILLISECONDS.toNanos(expiry);
}

/**
* Returns the value - possibly from the cache, if it is still valid.
*
* @param requestNewValueFuture If the value is expired, this supplier is called to supply the cache with a future
* that on completion will update the cached value
* @return the value in form of a CompletableFuture. You can for instance use it this way:
* `getValue().thenAccept(value->useYourValueHere(value));`. If you need the value synchronously you can use
* `getValue().get()`.
*/
@SuppressWarnings("null")
public CompletableFuture<V> getValue(@NonNull Supplier<CompletableFuture<V>> requestNewValueFuture) {
if (isExpired()) {
return refreshValue(requestNewValueFuture);
} else {
return CompletableFuture.completedFuture(value);
}
}

/**
* Invalidates the value in the cache.
*/
public void invalidateValue() {
expiresAt = 0;
}

/**
* Returns an arbitrary time reference in nanoseconds.
* This is used for the cache to determine if a value has expired.
*/
protected long getCurrentNanoTime() {
return System.nanoTime();
}

/**
* Refreshes and returns the value asynchronously. Use the return value like with getValue() to get the refreshed
* value.
*
* @param requestNewValueFuture This supplier is called to supply the cache with a future
* that on completion will update the cached value. The supplier will not be used,
* if there is already an ongoing refresh.
* @return the new value in form of a CompletableFuture.
*/
public synchronized @NonNull CompletableFuture<V> refreshValue(
@NonNull Supplier<CompletableFuture<V>> requestNewValueFuture) {
expiresAt = 0;
// There is already an ongoing refresh, just return that future
if (currentNewValueRequest != null) {
return currentNewValueRequest;
}
// We request a value update from the supplier now
currentNewValueRequest = requestNewValueFuture.get();
if (currentNewValueRequest == null) {
throw new IllegalArgumentException("We expect a CompletableFuture for refreshValue() to work!");
}
@SuppressWarnings("null")
CompletableFuture<V> t = currentNewValueRequest.thenApply(newValue -> {
// No request is ongoing anymore, update the value and expire time
currentNewValueRequest = null;
value = newValue;
expiresAt = getCurrentNanoTime() + expiry;
return value;
});
// The @NonNull annotation forces us to check the return value of thenApply.
if (t == null) {
throw new IllegalArgumentException("We expect a CompletableFuture for refreshValue() to work!");
}
return t;
}

/**
* Checks if the value is expired.
*
* @return true if the value is expired
*/
public boolean isExpired() {
return expiresAt < getCurrentNanoTime();
}

/**
* Return the raw value, no matter if it is already
* expired or still valid.
*/
@Nullable
public V getLastKnownValue() {
return value;
}
}
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.2.0" activate="activate" configuration-pid="org.eclipse.smarthome.mqtt" deactivate="deactivate" immediate="true" modified="modified" name="MQTTConnectionService">
<property name="service.pid" value="org.eclipse.smarthome.mqtt"/>
<service>
<provide interface="org.eclipse.smarthome.io.transport.mqtt.MqttService"/>
</service>
<reference bind="setEventPublisher" cardinality="1..1" interface="org.eclipse.smarthome.core.events.EventPublisher" name="EventPublisher" policy="dynamic" unbind="unsetEventPublisher"/>
<implementation class="org.eclipse.smarthome.io.transport.mqtt.MqttService"/>
</scr:component>
@@ -14,3 +14,72 @@ One can obtain the configured address via the `getPrimaryIpv4HostAddress()` meth
This service is useful for example in the `ThingHandlerFactory` or an `AudioSink` where one needs a specific IP address of the host system to provide something like a `callback` URL.

Some static methods like `getAllBroadcastAddresses()` for retrieving all interface broadcast addresses or `getInterfaceAddresses()` for retrieving all assigned interface addresses might be usefull as well for discovery services.

## Caching

The framework provides some caching solutions for common scenarios.

### Simple expiring and reloading cache

A common usage case is in a `ThingHandler` to encapsulate one value of an internal state and attach an expire time on that value. A cache action will be called to refresh the value if it is expired. This is what `ExpiringCache` implements. If `handleCommand(ChannelUID channelUID, Command command)` is called with the "RefreshType" command, you just return `cache.getValue()`.

It is a good practice to return as fast as possible from the `handleCommand(ChannelUID channelUID, Command command)` method to not block callers especially UIs.
Use this type of cache only, if your refresh action is a quick to compute, blocking operation. If you deal with network calls, consider the asynchronously reloading cache implementation instead.

### Expiring and asynchronously reloading cache

If we refreshed a value of the internal state in a `ThingHandler` just recently, we can return it immediately via the usual `updateState(channel, state)` method in response to a "RefreshType" command.
If the state is too old, we need to fetch it first and this may involve network calls, interprocess operations or anything else that will would block for a considerable amout of time.

A common usage case of the `ExpiringCacheAsync` cache type is in a `ThingHandler` to encapsulate one value of an internal state and attach an expire time on that value.


A **handleCommand** implementation with the interesting *RefreshType* could look like this:
```
public void handleCommand(ChannelUID channelUID, Command command) {
if (command instanceof RefreshType) {
switch (channelUID.getId()) {
case CHANNEL_1:
cache1.getValue(updater).thenAccept(value -> updateState(CHANNEL_1, value));
break;
...
}
}
}
```

The interesting part is the `updater`. If the value is not yet expired, the returned CompletableFuture will complete immediately and the given code is executed.
If the value is expired, the updater will be used to request a refreshed value.

An updater can be any class or lambda that implements the funtional interface of `Supplier<CompletableFuture<VALUE_TYPE>>`.

In the following example the method `CompletableFuture<VALUE_TYPE> get()` is accordingly implemented. The example assumes that we deal
with a still very common callback based device refreshing method `doSuperImportantAsyncStuffHereToGetRefreshedValue(listener)`. The listener is the class
itself, which implements `DeviceStateUpdateListener`. We will be called back with a refreshed device state in `asyncCallbackFromDeviceStateRefresh`
and mark the Future as *complete*.

```
class FetchValueFromDevice implements Supplier<CompletableFuture<double>>, DeviceStateUpdateListener {
CompletableFuture<double> c;
@Override
CompletableFuture<double> get() {
if (c != null) {
c = new CompletableFuture<double>();
doSuperImportantAsyncStuffHereToGetRefreshedValue( (DeviceStateUpdateListener)this );
}
return c;
}
// Here you process the callback from your device refresh method
@Override
void asyncCallbackFromDeviceStateRefresh(double newValue) {
// Notify the future that we have something
if (c != null) {
c.complete(newValue);
c = null;
}
}
}
```
If you deal with a newer implementation with a CompletableFuture support, it is even easier. You would just return your CompletableFuture.

0 comments on commit bad3b14

Please sign in to comment.
You can’t perform that action at this time.