diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index 1d947a029537a..e00c7ace6d46a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -147,13 +147,14 @@ class ProcTimeBoundedRangeOver( // when we find timestamps that are out of interest, we retrieve corresponding elements // and eliminate them. Multiple elements could have been received at the same timestamp // the removal of old elements happens only once per proctime as onTimer is called only once - val iter = rowMapState.keys.iterator + val iter = rowMapState.iterator val markToRemove = new ArrayList[Long]() while (iter.hasNext) { - val elementKey = iter.next + val entry = iter.next() + val elementKey = entry.getKey if (elementKey < limit) { // element key outside of window. Retract values - val elementsRemove = rowMapState.get(elementKey) + val elementsRemove = entry.getValue var iRemove = 0 while (iRemove < elementsRemove.size()) { val retractRow = elementsRemove.get(iRemove) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index 85c523ea4b4fc..b13acdf43ccab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -192,12 +192,13 @@ class RowTimeBoundedRangeOver( val retractTsList: JList[Long] = new JArrayList[Long] // do retraction - val dataTimestampIt = dataState.keys.iterator - while (dataTimestampIt.hasNext) { - val dataTs: Long = dataTimestampIt.next() + val iter = dataState.iterator() + while (iter.hasNext) { + val entry = iter.next() + val dataTs: Long = entry.getKey val offset = timestamp - dataTs if (offset > precedingOffset) { - val retractDataList = dataState.get(dataTs) + val retractDataList = entry.getValue dataListIndex = 0 while (dataListIndex < retractDataList.size()) { val retractRow = retractDataList.get(dataListIndex) diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java index abf2c498a0318..0483c4020036c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java @@ -25,6 +25,7 @@ import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; +import java.util.Map; /** * Test aggregator functions. @@ -223,10 +224,12 @@ public void merge(CountDistinctAccum acc, Iterable it) { acc.count += mergeAcc.count; try { - Iterator itrMap = mergeAcc.map.keys().iterator(); + Iterator itrMap = mergeAcc.map.iterator(); while (itrMap.hasNext()) { - String key = itrMap.next(); - Integer cnt = mergeAcc.map.get(key); + Map.Entry entry = + (Map.Entry) itrMap.next(); + String key = entry.getKey(); + Integer cnt = entry.getValue(); if (acc.map.contains(key)) { acc.map.put(key, acc.map.get(key) + cnt); } else { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala index 9060db5300e0a..1d8b50427471a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala @@ -85,10 +85,11 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[ val splits = user.split("#") if (null != data) { if (null != conf && conf.size > 0) { - val it = conf.keys.iterator - while (it.hasNext) { - val key = it.next() - val value = conf.get(key).get + val iter = conf.iterator + while (iter.hasNext) { + val entry = iter.next() + val key = entry._1 + val value = entry._2 collect( SimpleUser( data.concat("_key=")