Skip to content

Commit

Permalink
[FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no…
Browse files Browse the repository at this point in the history
… java stream API for performance

This closes #6683.
  • Loading branch information
azagrebin authored and tillrohrmann committed Sep 14, 2018
1 parent 4fdba85 commit 2e21582
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
Expand Up @@ -23,13 +23,12 @@
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* This class wraps list state with TTL logic.
Expand Down Expand Up @@ -73,11 +72,14 @@ public Iterable<T> get() throws Exception {
return () -> new IteratorWithCleanup(finalResult.iterator());
}

private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
.filter(v -> !expired(v))
.map(this::rewrapWithNewTs)
.collect(Collectors.toList());
private void updateTs(List<TtlValue<T>> ttlValues) throws Exception {
List<TtlValue<T>> unexpiredWithUpdatedTs = new ArrayList<>(ttlValues.size());
long currentTimestamp = timeProvider.currentTimestamp();
for (TtlValue<T> ttlValue : ttlValues) {
if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), currentTimestamp));
}
}
if (!unexpiredWithUpdatedTs.isEmpty()) {
original.update(unexpiredWithUpdatedTs);
}
Expand Down Expand Up @@ -105,8 +107,15 @@ public List<T> getInternal() throws Exception {
}

private <E> List<E> collect(Iterable<E> iterable) {
return iterable instanceof List ? (List<E>) iterable :
StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if (iterable instanceof List) {
return (List<E>) iterable;
} else {
List<E> list = new ArrayList<>();
for (E element : iterable) {
list.add(element);
}
return list;
}
}

@Override
Expand All @@ -116,7 +125,12 @@ public void updateInternal(List<T> valueToStore) throws Exception {
}

private List<TtlValue<T>> withTs(List<T> values) {
return values.stream().map(this::wrapWithTs).collect(Collectors.toList());
List<TtlValue<T>> withTs = new ArrayList<>(values.size());
for (T value : values) {
Preconditions.checkNotNull(value, "You cannot have null element in a ListState.");
withTs.add(wrapWithTs(value));
}
return withTs;
}

private class IteratorWithCleanup implements Iterator<T> {
Expand Down
Expand Up @@ -68,9 +68,10 @@ public void putAll(Map<UK, UV> map) throws Exception {
return;
}
Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
long currentTimestamp = timeProvider.currentTimestamp();
for (Map.Entry<UK, UV> entry : map.entrySet()) {
UK key = entry.getKey();
ttlMap.put(key, wrapWithTs(entry.getValue()));
ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp));
}
original.putAll(ttlMap);
}
Expand Down
Expand Up @@ -18,14 +18,24 @@

package org.apache.flink.runtime.state.ttl;

import javax.annotation.Nullable;

/** Common functions related to State TTL. */
class TtlUtils {
static <V> boolean expired(TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider);
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
return expired(ttlValue, ttl, timeProvider.currentTimestamp());
}

static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}

static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
return getExpirationTimestamp(ts, ttl) <= timeProvider.currentTimestamp();
return expired(ts, ttl, timeProvider.currentTimestamp());
}

private static boolean expired(long ts, long ttl, long currentTimestamp) {
return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
}

private static long getExpirationTimestamp(long ts, long ttl) {
Expand Down
Expand Up @@ -28,6 +28,8 @@
* @param <T> Type of the user value of state with TTL
*/
class TtlValue<T> implements Serializable {
private static final long serialVersionUID = 5221129704201125020L;

private final T userValue;
private final long lastAccessTimestamp;

Expand Down

0 comments on commit 2e21582

Please sign in to comment.