Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no…
… java stream API for performance
  • Loading branch information
azagrebin committed Sep 12, 2018
1 parent 1ae5983 commit 312b665
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
Expand Up @@ -23,13 +23,13 @@
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* This class wraps list state with TTL logic.
Expand Down Expand Up @@ -73,11 +73,14 @@ public Iterable<T> get() throws Exception {
return () -> new IteratorWithCleanup(finalResult.iterator());
}

private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
.filter(v -> !expired(v))
.map(this::rewrapWithNewTs)
.collect(Collectors.toList());
private void updateTs(List<TtlValue<T>> ttlValues) throws Exception {
List<TtlValue<T>> unexpiredWithUpdatedTs = new ArrayList<>();
long currentTimestamp = timeProvider.currentTimestamp();
for (TtlValue<T> ttlValue : ttlValues) {
if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), currentTimestamp));
}
}
if (!unexpiredWithUpdatedTs.isEmpty()) {
original.update(unexpiredWithUpdatedTs);
}
Expand Down Expand Up @@ -105,8 +108,15 @@ public List<T> getInternal() throws Exception {
}

private <E> List<E> collect(Iterable<E> iterable) {
return iterable instanceof List ? (List<E>) iterable :
StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if (iterable instanceof List) {
return (List<E>) iterable;
} else {
List<E> list = new ArrayList<>();
for (E element : iterable) {
list.add(element);
}
return list;
}
}

@Override
Expand Down
Expand Up @@ -21,11 +21,19 @@
/** Common functions related to State TTL. */
class TtlUtils {
static <V> boolean expired(TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider);
return expired(ttlValue, ttl, timeProvider.currentTimestamp());
}

static <V> boolean expired(TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}

static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
return getExpirationTimestamp(ts, ttl) <= timeProvider.currentTimestamp();
return expired(ts, ttl, timeProvider.currentTimestamp());
}

private static boolean expired(long ts, long ttl, long currentTimestamp) {
return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
}

private static long getExpirationTimestamp(long ts, long ttl) {
Expand Down

0 comments on commit 312b665

Please sign in to comment.