diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java index cb64df70dd2ed2..c648c205e4bf77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java @@ -23,13 +23,13 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * This class wraps list state with TTL logic. @@ -73,11 +73,14 @@ public Iterable get() throws Exception { return () -> new IteratorWithCleanup(finalResult.iterator()); } - private void updateTs(List> ttlValue) throws Exception { - List> unexpiredWithUpdatedTs = ttlValue.stream() - .filter(v -> !expired(v)) - .map(this::rewrapWithNewTs) - .collect(Collectors.toList()); + private void updateTs(List> ttlValues) throws Exception { + List> unexpiredWithUpdatedTs = new ArrayList<>(); + long currentTimestamp = timeProvider.currentTimestamp(); + for (TtlValue ttlValue : ttlValues) { + if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) { + unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), currentTimestamp)); + } + } if (!unexpiredWithUpdatedTs.isEmpty()) { original.update(unexpiredWithUpdatedTs); } @@ -105,8 +108,15 @@ public List getInternal() throws Exception { } private List collect(Iterable iterable) { - return iterable instanceof List ? (List) iterable : - StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + if (iterable instanceof List) { + return (List) iterable; + } else { + List list = new ArrayList<>(); + for (E element : iterable) { + list.add(element); + } + return list; + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java index 9d9e5e1a10d99a..96fdff64d3fdca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java @@ -21,11 +21,19 @@ /** Common functions related to State TTL. */ class TtlUtils { static boolean expired(TtlValue ttlValue, long ttl, TtlTimeProvider timeProvider) { - return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider); + return expired(ttlValue, ttl, timeProvider.currentTimestamp()); + } + + static boolean expired(TtlValue ttlValue, long ttl, long currentTimestamp) { + return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) { - return getExpirationTimestamp(ts, ttl) <= timeProvider.currentTimestamp(); + return expired(ts, ttl, timeProvider.currentTimestamp()); + } + + private static boolean expired(long ts, long ttl, long currentTimestamp) { + return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) {