Skip to content

Commit

Permalink
Create index of timestamps to speed up streamSince() (#26)
Browse files Browse the repository at this point in the history
* Create index of timestamps to speed up streamSince()

* Use TreeMap

ConcurrentSkipListMap throws ConcurrentModificationException

* Update last updated index to use multimap.

* Use BitSet instead of Multimap.
  • Loading branch information
Asgeir Storesund Nilsen committed Sep 10, 2020
1 parent 643b1b8 commit f75479c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
50 changes: 27 additions & 23 deletions src/main/java/no/fint/cache/FintCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.io.Serializable;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -19,14 +20,13 @@
@Slf4j
public class FintCache<T extends Serializable> implements Cache<T>, Serializable {
@Getter
private final CacheMetaData cacheMetaData;
private CacheMetaData cacheMetaData;
private List<CacheObject<T>> cacheObjects;
private Map<Integer, Index> index;
private NavigableMap<Long, BitSet> lastUpdatedIndex;

public FintCache() {
cacheMetaData = new CacheMetaData();
cacheObjects = Collections.emptyList();
index = Collections.emptyMap();
flush();
}

@Override
Expand Down Expand Up @@ -93,8 +93,10 @@ public void addCache(List<CacheObject<T>> objects) {

@Override
public void flush() {
flushMetaData();
cacheMetaData = new CacheMetaData();
cacheObjects = Collections.emptyList();
index = Collections.emptyMap();
lastUpdatedIndex = Collections.emptyNavigableMap();
}

@Override
Expand All @@ -108,34 +110,42 @@ public List<T> getSourceList() {

@Override
public Stream<CacheObject<T>> streamSince(long timestamp) {
return cacheObjects.stream().filter(cacheObject -> (cacheObject.getLastUpdated() > timestamp));
return lastUpdatedIndex
.tailMap(timestamp, false)
.values()
.stream()
.flatMapToInt(BitSet::stream)
.mapToObj(cacheObjects::get);
}

public List<?> getSourceListSince(long timestamp) {
return cacheObjects
.stream()
.filter(cacheObject -> (cacheObject.getLastUpdated() >= timestamp))
.map(CacheObject::getObject)
.collect(Collectors.toList());
return streamSince(timestamp).map(CacheObject::getObject).collect(Collectors.toList());
}

private void updateMetaData() {
Map<Integer, Index> newIndex = new HashMap<>();
NavigableMap<Long, BitSet> newLastUpdatedIndex = new TreeMap<>();
cacheMetaData.setCacheCount(cacheObjects.size());
cacheMetaData.setLastUpdated(System.currentTimeMillis());
ListIterator<CacheObject<T>> iterator = cacheObjects.listIterator();
while (iterator.hasNext()) {
int i = iterator.nextIndex();
CacheObject<T> it = iterator.next();
IntStream.of(it.getHashCodes()).forEach(key -> newIndex.compute(key, (k, v) -> {
if (v == null) {
return new SingleIndex(i);
}
return v.add(i);
}));
IntStream.of(it.getHashCodes()).forEach(key -> newIndex.compute(key, createIndex(i)));
newLastUpdatedIndex.computeIfAbsent(it.getLastUpdated(), k -> new BitSet()).set(i);
}
cacheMetaData.setSize(cacheObjects.parallelStream().mapToLong(CacheObject::getSize).sum());
index = newIndex;
lastUpdatedIndex = newLastUpdatedIndex;
}

private static <K> BiFunction<K, Index, Index> createIndex(int i) {
return (k, v) -> {
if (v == null) {
return new SingleIndex(i);
}
return v.add(i);
};
}

private Map<String, CacheObject<T>> getMap(List<T> list) {
Expand All @@ -146,12 +156,6 @@ private Map<String, CacheObject<T>> getCacheMap(List<CacheObject<T>> list) {
return list.parallelStream().collect(Collectors.toMap(CacheObject::getChecksum, Function.identity(), (a, b) -> b));
}

private void flushMetaData() {
cacheMetaData.setCacheCount(0);
cacheMetaData.setLastUpdated(0);
cacheMetaData.setSize(0L);
}

@Override
public long getLastUpdated() {
return cacheMetaData.getLastUpdated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class FintCacheIntegrationSpec extends Specification {
def result = testCacheService.streamSliceSince('rogfk.no', System.currentTimeMillis() - 500, 1, 1).collect(Collectors.toList())

then:
result == ['test4']
result.size() == 1
result.every { it.startsWith('test')}
}
}

0 comments on commit f75479c

Please sign in to comment.