Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ class ProcTimeBoundedRangeOver(
// when we find timestamps that are out of interest, we retrieve corresponding elements
// and eliminate them. Multiple elements could have been received at the same timestamp
// the removal of old elements happens only once per proctime as onTimer is called only once
val iter = rowMapState.keys.iterator
val iter = rowMapState.iterator
val markToRemove = new ArrayList[Long]()
while (iter.hasNext) {
val elementKey = iter.next
val entry = iter.next()
val elementKey = entry.getKey
if (elementKey < limit) {
// element key outside of window. Retract values
val elementsRemove = rowMapState.get(elementKey)
val elementsRemove = entry.getValue
var iRemove = 0
while (iRemove < elementsRemove.size()) {
val retractRow = elementsRemove.get(iRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,13 @@ class RowTimeBoundedRangeOver(
val retractTsList: JList[Long] = new JArrayList[Long]

// do retraction
val dataTimestampIt = dataState.keys.iterator
while (dataTimestampIt.hasNext) {
val dataTs: Long = dataTimestampIt.next()
val iter = dataState.iterator()
while (iter.hasNext) {
val entry = iter.next()
val dataTs: Long = entry.getKey
val offset = timestamp - dataTs
if (offset > precedingOffset) {
val retractDataList = dataState.get(dataTs)
val retractDataList = entry.getValue
dataListIndex = 0
while (dataListIndex < retractDataList.size()) {
val retractRow = retractDataList.get(dataListIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.functions.AggregateFunction;

import java.util.Iterator;
import java.util.Map;

/**
* Test aggregator functions.
Expand Down Expand Up @@ -223,10 +224,12 @@ public void merge(CountDistinctAccum acc, Iterable<CountDistinctAccum> it) {
acc.count += mergeAcc.count;

try {
Iterator<String> itrMap = mergeAcc.map.keys().iterator();
Iterator itrMap = mergeAcc.map.iterator();
while (itrMap.hasNext()) {
String key = itrMap.next();
Integer cnt = mergeAcc.map.get(key);
Map.Entry<String, Integer> entry =
(Map.Entry<String, Integer>) itrMap.next();
String key = entry.getKey();
Integer cnt = entry.getValue();
if (acc.map.contains(key)) {
acc.map.put(key, acc.map.get(key) + cnt);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
val splits = user.split("#")
if (null != data) {
if (null != conf && conf.size > 0) {
val it = conf.keys.iterator
while (it.hasNext) {
val key = it.next()
val value = conf.get(key).get
val iter = conf.iterator
while (iter.hasNext) {
val entry = iter.next()
val key = entry._1
val value = entry._2
collect(
SimpleUser(
data.concat("_key=")
Expand Down